This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new d2d7ce89 [YUNIKORN-1774] Event cache: misc cleanup (#559)
d2d7ce89 is described below

commit d2d7ce89457cbbce140e2737098ae62f6a9d8fee
Author: Peter Bacsko <[email protected]>
AuthorDate: Fri Jun 9 10:41:56 2023 +0200

    [YUNIKORN-1774] Event cache: misc cleanup (#559)
    
    Closes: #559
    
    Signed-off-by: Peter Bacsko <[email protected]>
---
 pkg/entrypoint/entrypoint.go                   |  23 ++--
 pkg/events/event_cache_test.go                 | 164 -------------------------
 pkg/events/event_publisher.go                  |  33 ++---
 pkg/events/event_publisher_test.go             |  14 ++-
 pkg/events/event_store.go                      |  18 +--
 pkg/events/event_store_test.go                 |   4 +-
 pkg/events/{event_cache.go => event_system.go} |  57 +++++----
 pkg/events/event_system_test.go                |  90 ++++++++++++++
 pkg/scheduler/objects/application.go           |  26 +---
 pkg/scheduler/objects/events.go                |  72 +++++++++++
 pkg/scheduler/objects/events_test.go           | 111 +++++++++++++++++
 pkg/scheduler/partition_test.go                |  22 ++--
 12 files changed, 357 insertions(+), 277 deletions(-)

diff --git a/pkg/entrypoint/entrypoint.go b/pkg/entrypoint/entrypoint.go
index 2fee2dc2..2a42a989 100644
--- a/pkg/entrypoint/entrypoint.go
+++ b/pkg/entrypoint/entrypoint.go
@@ -36,7 +36,7 @@ type startupOptions struct {
        manualScheduleFlag bool
        startWebAppFlag    bool
        metricsHistorySize int
-       eventCacheEnabled  bool
+       eventSystemEnabled bool
 }
 
 func StartAllServices() *ServiceContext {
@@ -46,7 +46,7 @@ func StartAllServices() *ServiceContext {
                        manualScheduleFlag: false,
                        startWebAppFlag:    true,
                        metricsHistorySize: 1440,
-                       eventCacheEnabled:  false,
+                       eventSystemEnabled: false,
                })
 }
 
@@ -63,18 +63,15 @@ func StartAllServicesWithManualScheduler() *ServiceContext {
                        manualScheduleFlag: true,
                        startWebAppFlag:    false,
                        metricsHistorySize: 0,
-                       eventCacheEnabled:  false,
+                       eventSystemEnabled: false,
                })
 }
 
 func startAllServicesWithParameters(opts startupOptions) *ServiceContext {
-       var eventCache *events.EventCache
-       var eventPublisher events.EventPublisher
-       if opts.eventCacheEnabled {
-               log.Logger().Info("creating event cache")
-               events.CreateAndSetEventCache()
-               eventCache = events.GetEventCache()
-               eventPublisher = events.CreateShimPublisher(eventCache.Store)
+       if opts.eventSystemEnabled {
+               log.Logger().Info("creating and starting event system")
+               events.CreateAndSetEventSystem()
+               events.GetEventSystem().StartService()
        }
 
        sched := scheduler.NewScheduler()
@@ -89,12 +86,6 @@ func startAllServicesWithParameters(opts startupOptions) 
*ServiceContext {
        log.Logger().Info("ServiceContext start scheduling services")
        sched.StartService(eventHandler, opts.manualScheduleFlag)
        proxy.StartService(eventHandler)
-       if opts.eventCacheEnabled && eventCache != nil {
-               eventCache.StartService()
-               if eventPublisher != nil {
-                       eventPublisher.StartService()
-               }
-       }
 
        context := &ServiceContext{
                RMProxy:   proxy,
diff --git a/pkg/events/event_cache_test.go b/pkg/events/event_cache_test.go
deleted file mode 100644
index f7a72754..00000000
--- a/pkg/events/event_cache_test.go
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-*/
-
-package events
-
-import (
-       "sync"
-       "testing"
-       "time"
-
-       "go.uber.org/zap"
-
-       "gotest.tools/v3/assert"
-
-       "github.com/apache/yunikorn-core/pkg/common"
-       "github.com/apache/yunikorn-core/pkg/log"
-       "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
-)
-
-// the EventCache should be nil by default, until not set by 
CreateAndSetEventCache()
-func TestGetEventCache(t *testing.T) {
-       cache := GetEventCache()
-       assert.Assert(t, cache == nil, "the cache should be nil by default")
-       CreateAndSetEventCache()
-       cache = GetEventCache()
-       assert.Assert(t, cache != nil, "the cache should not be nil")
-}
-
-// StartService() and Stop() must not cause panic
-func TestSimpleStartAndStop(t *testing.T) {
-       CreateAndSetEventCache()
-       cache := GetEventCache()
-       // adding event to stopped cache does not cause panic
-       cache.AddEvent(nil)
-       cache.StartService()
-       // add an event
-       cache.AddEvent(nil)
-       cache.Stop()
-       // adding event to stopped cache does not cause panic
-       cache.AddEvent(nil)
-}
-
-// if an EventRecord is added to the EventCache, the same record
-// should be retrieved from the EventStore
-func TestSingleEventStoredCorrectly(t *testing.T) {
-       CreateAndSetEventCache()
-       cache := GetEventCache()
-       cache.StartService()
-
-       event := si.EventRecord{
-               Type:     si.EventRecord_REQUEST,
-               ObjectID: "alloc1",
-               GroupID:  "app1",
-               Reason:   "reason",
-               Message:  "message",
-       }
-       cache.AddEvent(&event)
-
-       // wait for events to be processed
-       err := common.WaitFor(1*time.Millisecond, 10*time.Millisecond, func() 
bool {
-               return cache.Store.CountStoredEvents() == 1
-       })
-       assert.NilError(t, err, "the event should have been processed")
-
-       records := cache.Store.CollectEvents()
-       if records == nil {
-               t.Fatal("collecting eventChannel should return something")
-       }
-       assert.Equal(t, len(records), 1)
-       record := records[0]
-       assert.Equal(t, record.Type, si.EventRecord_REQUEST)
-       assert.Equal(t, record.ObjectID, "alloc1")
-       assert.Equal(t, record.GroupID, "app1")
-       assert.Equal(t, record.Message, "message")
-       assert.Equal(t, record.Reason, "reason")
-}
-
-type slowEventStore struct {
-       busy bool
-
-       sync.Mutex
-}
-
-func (ses *slowEventStore) setBusy(b bool) {
-       ses.Lock()
-       defer ses.Unlock()
-
-       log.Logger().Info("setting busy to ", zap.Bool("bool", b))
-
-       ses.busy = b
-}
-
-func (ses *slowEventStore) getBusy() bool {
-       ses.Lock()
-       defer ses.Unlock()
-
-       log.Logger().Info("getting busy ", zap.Bool("bool", ses.busy))
-
-       return ses.busy
-}
-
-func (ses *slowEventStore) Store(*si.EventRecord) {
-       ses.setBusy(true)
-       defer ses.setBusy(false)
-
-       time.Sleep(50 * time.Millisecond)
-}
-
-func (ses *slowEventStore) CollectEvents() []*si.EventRecord {
-       return nil
-}
-
-func (ses *slowEventStore) CountStoredEvents() int {
-       return 0
-}
-
-// this test checks that if storing events is much slower
-// than the rate the events are generated, it doesn't cause
-// panic by filling up the EventChannel
-func TestSlowChannelFillingUpEventChannel(t *testing.T) {
-       defaultEventChannelSize = 2
-
-       store := slowEventStore{}
-       cache := createEventCacheInternal(&store)
-
-       cache.StartService()
-
-       event := si.EventRecord{
-               Type:     si.EventRecord_REQUEST,
-               ObjectID: "alloc1",
-               GroupID:  "app1",
-               Reason:   "reason",
-               Message:  "message",
-       }
-
-       cache.AddEvent(&event)
-
-       // wait for events to be occupy the slow store
-       err := common.WaitFor(1*time.Millisecond, 100*time.Millisecond, 
store.getBusy)
-       assert.NilError(t, err, "the event should have been processed")
-       assert.Equal(t, len(cache.channel), 0, "the number of queued elements 
should be zero")
-
-       cache.AddEvent(&event)
-       assert.Equal(t, len(cache.channel), 1, "the number of queued elements 
should be two")
-       cache.AddEvent(&event)
-       assert.Equal(t, len(cache.channel), 2, "the number of queued elements 
should be two")
-       cache.AddEvent(&event)
-       assert.Equal(t, len(cache.channel), 2, "the number of queued elements 
should be two")
-}
diff --git a/pkg/events/event_publisher.go b/pkg/events/event_publisher.go
index ba1e45e2..6225a1ee 100644
--- a/pkg/events/event_publisher.go
+++ b/pkg/events/event_publisher.go
@@ -31,38 +31,25 @@ import (
 // stores the push event internal
 var defaultPushEventInterval = 2 * time.Second
 
-type EventPublisher interface {
-       StartService()
-       Stop()
-}
-
-type shimPublisher struct {
-       store             EventStore
+type EventPublisher struct {
+       store             *EventStore
        pushEventInterval time.Duration
-       stop              atomic.Value
-}
-
-func CreateShimPublisher(store EventStore) EventPublisher {
-       return createShimPublisherInternal(store)
-}
-
-func createShimPublisherInternal(store EventStore) *shimPublisher {
-       return createShimPublisherWithParameters(store, 
defaultPushEventInterval)
+       stop              atomic.Bool
 }
 
-func createShimPublisherWithParameters(store EventStore, pushEventInterval 
time.Duration) *shimPublisher {
-       publisher := &shimPublisher{
+func CreateShimPublisher(store *EventStore) *EventPublisher {
+       publisher := &EventPublisher{
                store:             store,
-               pushEventInterval: pushEventInterval,
+               pushEventInterval: defaultPushEventInterval,
        }
        publisher.stop.Store(false)
        return publisher
 }
 
-func (sp *shimPublisher) StartService() {
+func (sp *EventPublisher) StartService() {
        go func() {
                for {
-                       if sp.stop.Load().(bool) {
+                       if sp.stop.Load() {
                                break
                        }
                        messages := sp.store.CollectEvents()
@@ -77,10 +64,10 @@ func (sp *shimPublisher) StartService() {
        }()
 }
 
-func (sp *shimPublisher) Stop() {
+func (sp *EventPublisher) Stop() {
        sp.stop.Store(true)
 }
 
-func (sp *shimPublisher) getEventStore() EventStore {
+func (sp *EventPublisher) getEventStore() *EventStore {
        return sp.store
 }
diff --git a/pkg/events/event_publisher_test.go 
b/pkg/events/event_publisher_test.go
index 85d87cb4..b9d61e97 100644
--- a/pkg/events/event_publisher_test.go
+++ b/pkg/events/event_publisher_test.go
@@ -79,8 +79,8 @@ func TestCreateShimPublisher(t *testing.T) {
 
 // StartService() and Stop() functions should not cause panic
 func TestServiceStartStopInternal(t *testing.T) {
-       store := newEventStoreImpl()
-       publisher := createShimPublisherInternal(store)
+       store := newEventStore()
+       publisher := CreateShimPublisher(store)
        publisher.StartService()
        defer publisher.Stop()
        assert.Equal(t, publisher.getEventStore(), store)
@@ -89,8 +89,9 @@ func TestServiceStartStopInternal(t *testing.T) {
 func TestNoFillWithoutEventPluginRegistered(t *testing.T) {
        pushEventInterval := 2 * time.Millisecond
 
-       store := newEventStoreImpl()
-       publisher := createShimPublisherWithParameters(store, pushEventInterval)
+       store := newEventStore()
+       publisher := CreateShimPublisher(store)
+       publisher.pushEventInterval = pushEventInterval
        publisher.StartService()
        defer publisher.Stop()
 
@@ -116,8 +117,9 @@ func TestPublisherSendsEvent(t *testing.T) {
        eventPlugin, err := createEventPluginForTest()
        assert.NilError(t, err, "could not create event plugin for test")
 
-       store := newEventStoreImpl()
-       publisher := createShimPublisherWithParameters(store, pushEventInterval)
+       store := newEventStore()
+       publisher := CreateShimPublisher(store)
+       publisher.pushEventInterval = pushEventInterval
        publisher.StartService()
        defer publisher.Stop()
 
diff --git a/pkg/events/event_store.go b/pkg/events/event_store.go
index 8625a641..cd2cc603 100644
--- a/pkg/events/event_store.go
+++ b/pkg/events/event_store.go
@@ -33,25 +33,19 @@ var defaultEventStoreSize = 1000
 //
 // Assuming the rate of events generated by the scheduler component in a given 
time period
 // is high, calling CollectEvents() periodically should be fine.
-type EventStore interface {
-       Store(event *si.EventRecord)
-       CollectEvents() []*si.EventRecord
-       CountStoredEvents() int
-}
-
-type defaultEventStore struct {
+type EventStore struct {
        events []*si.EventRecord
        idx    int // points where to store the next event
        sync.RWMutex
 }
 
-func newEventStoreImpl() EventStore {
-       return &defaultEventStore{
+func newEventStore() *EventStore {
+       return &EventStore{
                events: make([]*si.EventRecord, defaultEventStoreSize),
        }
 }
 
-func (es *defaultEventStore) Store(event *si.EventRecord) {
+func (es *EventStore) Store(event *si.EventRecord) {
        es.Lock()
        defer es.Unlock()
 
@@ -65,7 +59,7 @@ func (es *defaultEventStore) Store(event *si.EventRecord) {
        metrics.GetEventMetrics().IncEventsStored()
 }
 
-func (es *defaultEventStore) CollectEvents() []*si.EventRecord {
+func (es *EventStore) CollectEvents() []*si.EventRecord {
        es.Lock()
        defer es.Unlock()
 
@@ -77,7 +71,7 @@ func (es *defaultEventStore) CollectEvents() 
[]*si.EventRecord {
        return messages
 }
 
-func (es *defaultEventStore) CountStoredEvents() int {
+func (es *EventStore) CountStoredEvents() int {
        es.RLock()
        defer es.RUnlock()
 
diff --git a/pkg/events/event_store_test.go b/pkg/events/event_store_test.go
index 5eb00bb4..93b46f30 100644
--- a/pkg/events/event_store_test.go
+++ b/pkg/events/event_store_test.go
@@ -29,7 +29,7 @@ import (
 
 // the fields of an event should match after stored and retrieved
 func TestStoreAndRetrieve(t *testing.T) {
-       store := newEventStoreImpl()
+       store := newEventStore()
        event := si.EventRecord{
                Type:     si.EventRecord_REQUEST,
                ObjectID: "alloc1",
@@ -58,7 +58,7 @@ func TestStoreAndRetrieve(t *testing.T) {
 func TestStoreWithLimitedSize(t *testing.T) {
        defaultEventStoreSize = 3
 
-       store := newEventStoreImpl()
+       store := newEventStore()
        for i := 0; i < 5; i++ {
                event := &si.EventRecord{
                        Type:     si.EventRecord_REQUEST,
diff --git a/pkg/events/event_cache.go b/pkg/events/event_system.go
similarity index 66%
rename from pkg/events/event_cache.go
rename to pkg/events/event_system.go
index baf5fd2e..304e0dd9 100644
--- a/pkg/events/event_cache.go
+++ b/pkg/events/event_system.go
@@ -29,39 +29,45 @@ import (
 // need to change for testing
 var defaultEventChannelSize = 100000
 
-var cache *EventCache
+var ev EventSystem
 
-type EventCache struct {
-       Store EventStore // storing eventChannel
+type EventSystem interface {
+       AddEvent(event *si.EventRecord)
+       StartService()
+       Stop()
+}
+
+type EventSystemImpl struct {
+       Store     *EventStore // storing eventChannel
+       publisher *EventPublisher
 
        channel chan *si.EventRecord // channelling input eventChannel
        stop    chan bool            // whether the service is stop
+       stopped bool
 
        sync.Mutex
 }
 
-func GetEventCache() *EventCache {
-       return cache
+func GetEventSystem() EventSystem {
+       return ev
 }
 
-func CreateAndSetEventCache() {
-       cache = createEventStoreAndCache()
+func CreateAndSetEventSystem() {
+       store := newEventStore()
+       ev = &EventSystemImpl{
+               Store:     store,
+               channel:   make(chan *si.EventRecord, defaultEventChannelSize),
+               stop:      make(chan bool),
+               stopped:   false,
+               publisher: CreateShimPublisher(store),
+       }
 }
 
-func createEventStoreAndCache() *EventCache {
-       store := newEventStoreImpl()
-       return createEventCacheInternal(store)
+func (ec *EventSystemImpl) StartService() {
+       ec.StartServiceWithPublisher(true)
 }
 
-func createEventCacheInternal(store EventStore) *EventCache {
-       return &EventCache{
-               Store:   store,
-               channel: make(chan *si.EventRecord, defaultEventChannelSize),
-               stop:    make(chan bool),
-       }
-}
-
-func (ec *EventCache) StartService() {
+func (ec *EventSystemImpl) StartServiceWithPublisher(withPublisher bool) {
        go func() {
                for {
                        select {
@@ -78,20 +84,29 @@ func (ec *EventCache) StartService() {
                        }
                }
        }()
+       if withPublisher {
+               ec.publisher.StartService()
+       }
 }
 
-func (ec *EventCache) Stop() {
+func (ec *EventSystemImpl) Stop() {
        ec.Lock()
        defer ec.Unlock()
 
+       if ec.stopped {
+               return
+       }
+
        ec.stop <- true
        if ec.channel != nil {
                close(ec.channel)
                ec.channel = nil
        }
+       ec.publisher.Stop()
+       ec.stopped = true
 }
 
-func (ec *EventCache) AddEvent(event *si.EventRecord) {
+func (ec *EventSystemImpl) AddEvent(event *si.EventRecord) {
        metrics.GetEventMetrics().IncEventsCreated()
        select {
        case ec.channel <- event:
diff --git a/pkg/events/event_system_test.go b/pkg/events/event_system_test.go
new file mode 100644
index 00000000..19dab280
--- /dev/null
+++ b/pkg/events/event_system_test.go
@@ -0,0 +1,90 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+       "testing"
+       "time"
+
+       "gotest.tools/v3/assert"
+
+       "github.com/apache/yunikorn-core/pkg/common"
+       "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+// the EventSystem should be nil by default, until not set by 
CreateAndSetEventSystem()
+func TestGetEventSystem(t *testing.T) {
+       eventSystem := GetEventSystem()
+       assert.Assert(t, eventSystem == nil, "the eventSystem should be nil by 
default")
+       CreateAndSetEventSystem()
+       eventSystem = GetEventSystem()
+       assert.Assert(t, eventSystem != nil, "the eventSystem should not be 
nil")
+}
+
+// StartService() and Stop() must not cause panic
+func TestSimpleStartAndStop(t *testing.T) {
+       CreateAndSetEventSystem()
+       eventSystem := GetEventSystem()
+       // adding event to stopped eventSystem does not cause panic
+       eventSystem.AddEvent(nil)
+       eventSystem.StartService()
+       defer eventSystem.Stop()
+       // add an event
+       eventSystem.AddEvent(nil)
+       eventSystem.Stop()
+       // adding event to stopped eventSystem does not cause panic
+       eventSystem.AddEvent(nil)
+}
+
+// if an EventRecord is added to the EventSystem, the same record
+// should be retrieved from the EventStore
+func TestSingleEventStoredCorrectly(t *testing.T) {
+       CreateAndSetEventSystem()
+       eventSystem := GetEventSystem().(*EventSystemImpl) //nolint:errcheck
+       // don't run publisher, because it can collect the event while we're 
waiting
+       eventSystem.StartServiceWithPublisher(false)
+       defer eventSystem.Stop()
+
+       event := si.EventRecord{
+               Type:     si.EventRecord_REQUEST,
+               ObjectID: "alloc1",
+               GroupID:  "app1",
+               Reason:   "reason",
+               Message:  "message",
+       }
+       eventSystem.AddEvent(&event)
+
+       // wait for events to be processed
+       err := common.WaitFor(time.Millisecond, time.Second, func() bool {
+               return eventSystem.Store.CountStoredEvents() == 1
+       })
+       assert.NilError(t, err, "the event should have been processed")
+
+       records := eventSystem.Store.CollectEvents()
+       if records == nil {
+               t.Fatal("collecting eventChannel should return something")
+       }
+       assert.Equal(t, len(records), 1)
+       record := records[0]
+       assert.Equal(t, record.Type, si.EventRecord_REQUEST)
+       assert.Equal(t, record.ObjectID, "alloc1")
+       assert.Equal(t, record.GroupID, "app1")
+       assert.Equal(t, record.Message, "message")
+       assert.Equal(t, record.Reason, "reason")
+}
diff --git a/pkg/scheduler/objects/application.go 
b/pkg/scheduler/objects/application.go
index 6fcbe4d2..9d061565 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -107,6 +107,7 @@ type Application struct {
        rmEventHandler     handler.EventHandler
        rmID               string
        terminatedCallback func(appID string)
+       appEvents          *applicationEvents
 
        sync.RWMutex
 }
@@ -196,6 +197,7 @@ func NewApplication(siApp *si.AddApplicationRequest, ugi 
security.UserGroup, eve
        app.user = ugi
        app.rmEventHandler = eventHandler
        app.rmID = rmID
+       app.appEvents = newApplicationEvents(app, events.GetEventSystem())
        return app
 }
 
@@ -960,17 +962,7 @@ func (sa *Application) tryAllocate(headRoom 
*resources.Resource, preemptionDelay
                                }
                        }
 
-                       // post scheduling events via the event plugin
-                       if eventCache := events.GetEventCache(); eventCache != 
nil {
-                               message := fmt.Sprintf("Application %s does not 
fit into %s queue", request.GetApplicationID(), sa.queuePath)
-                               if event, err := 
events.CreateRequestEventRecord(request.GetAllocationKey(), 
request.GetApplicationID(), "InsufficientQueueResources", message); err != nil {
-                                       log.Logger().Warn("Event creation 
failed",
-                                               zap.String("event message", 
message),
-                                               zap.Error(err))
-                               } else {
-                                       eventCache.AddEvent(event)
-                               }
-                       }
+                       sa.appEvents.sendAppDoesNotFitEvent(request)
                        continue
                }
 
@@ -1126,17 +1118,7 @@ func (sa *Application) 
tryPlaceholderAllocate(nodeIterator func() NodeIterator,
                                // release the placeholder and tell the RM
                                ph.SetReleased(true)
                                sa.notifyRMAllocationReleased(sa.rmID, 
[]*Allocation{ph}, si.TerminationType_TIMEOUT, "cancel placeholder: resource 
incompatible")
-                               // add an event on the app to show the release
-                               if eventCache := events.GetEventCache(); 
eventCache != nil {
-                                       message := fmt.Sprintf("Task group '%s' 
in application '%s': allocation resources '%s' are not matching placeholder 
'%s' allocation with ID '%s'", ph.GetTaskGroup(), sa.ApplicationID, 
request.GetAllocatedResource().String(), ph.GetAllocatedResource().String(), 
ph.GetAllocationKey())
-                                       if event, err := 
events.CreateRequestEventRecord(ph.GetAllocationKey(), sa.ApplicationID, 
"releasing placeholder: real allocation is larger than placeholder", message); 
err != nil {
-                                               log.Logger().Warn("Event 
creation failed",
-                                                       zap.String("event 
message", message),
-                                                       zap.Error(err))
-                                       } else {
-                                               eventCache.AddEvent(event)
-                                       }
-                               }
+                               sa.appEvents.sendPlaceholderLargerEvent(ph, 
request)
                                continue
                        }
                        // placeholder is the same or larger continue 
processing and difference is handled when the placeholder
diff --git a/pkg/scheduler/objects/events.go b/pkg/scheduler/objects/events.go
new file mode 100644
index 00000000..16226d9e
--- /dev/null
+++ b/pkg/scheduler/objects/events.go
@@ -0,0 +1,72 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+       "fmt"
+
+       "go.uber.org/zap"
+
+       "github.com/apache/yunikorn-core/pkg/events"
+       "github.com/apache/yunikorn-core/pkg/log"
+)
+
+type applicationEvents struct {
+       enabled     bool
+       eventSystem events.EventSystem
+       app         *Application
+}
+
+func (evt *applicationEvents) sendAppDoesNotFitEvent(request *AllocationAsk) {
+       if !evt.enabled {
+               return
+       }
+
+       message := fmt.Sprintf("Application %s does not fit into %s queue", 
request.GetApplicationID(), evt.app.queuePath)
+       if event, err := 
events.CreateRequestEventRecord(request.GetAllocationKey(), 
request.GetApplicationID(), "InsufficientQueueResources", message); err != nil {
+               log.Logger().Warn("Event creation failed",
+                       zap.String("event message", message),
+                       zap.Error(err))
+       } else {
+               evt.eventSystem.AddEvent(event)
+       }
+}
+
+func (evt *applicationEvents) sendPlaceholderLargerEvent(ph *Allocation, 
request *AllocationAsk) {
+       if !evt.enabled {
+               return
+       }
+
+       message := fmt.Sprintf("Task group '%s' in application '%s': allocation 
resources '%s' are not matching placeholder '%s' allocation with ID '%s'", 
ph.GetTaskGroup(), evt.app.ApplicationID, 
request.GetAllocatedResource().String(), ph.GetAllocatedResource().String(), 
ph.GetAllocationKey())
+       if event, err := events.CreateRequestEventRecord(ph.GetAllocationKey(), 
evt.app.ApplicationID, "releasing placeholder: real allocation is larger than 
placeholder", message); err != nil {
+               log.Logger().Warn("Event creation failed",
+                       zap.String("event message", message),
+                       zap.Error(err))
+       } else {
+               evt.eventSystem.AddEvent(event)
+       }
+}
+
+func newApplicationEvents(app *Application, evt events.EventSystem) 
*applicationEvents {
+       return &applicationEvents{
+               eventSystem: evt,
+               enabled:     evt != nil,
+               app:         app,
+       }
+}
diff --git a/pkg/scheduler/objects/events_test.go 
b/pkg/scheduler/objects/events_test.go
new file mode 100644
index 00000000..688c1666
--- /dev/null
+++ b/pkg/scheduler/objects/events_test.go
@@ -0,0 +1,111 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+       "testing"
+
+       "gotest.tools/v3/assert"
+
+       "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+type EventSystemMock struct {
+       events []*si.EventRecord
+}
+
+func (m *EventSystemMock) AddEvent(event *si.EventRecord) {
+       m.events = append(m.events, event)
+}
+
+func (m *EventSystemMock) StartService() {}
+
+func (m *EventSystemMock) Stop() {}
+
+func TestSendAppDoesNotFitEvent(t *testing.T) {
+       app := &Application{
+               queuePath: "root.test",
+       }
+
+       // not enabled
+       evt := newApplicationEvents(app, nil)
+       assert.Assert(t, evt.eventSystem == nil, "event system should be nil")
+       assert.Assert(t, !evt.enabled, "event system should be disabled")
+       evt.sendAppDoesNotFitEvent(&AllocationAsk{})
+
+       // enabled
+       mock := newEventSystemMock()
+       evt = newApplicationEvents(app, mock)
+       assert.Assert(t, evt.eventSystem != nil, "event system should not be 
nil")
+       assert.Assert(t, evt.enabled, "event system should be enabled")
+       evt.sendAppDoesNotFitEvent(&AllocationAsk{
+               applicationID: appID0,
+               allocationKey: aKey,
+       })
+       assert.Equal(t, 1, len(mock.events), "event was not generated")
+
+       // enabled, ObjectID missing
+       mock = newEventSystemMock()
+       evt = newApplicationEvents(app, mock)
+       assert.Assert(t, evt.eventSystem != nil, "event system should not be 
nil")
+       assert.Assert(t, evt.enabled, "event system should be enabled")
+       evt.sendAppDoesNotFitEvent(&AllocationAsk{
+               applicationID: appID0,
+       })
+       assert.Equal(t, 0, len(mock.events), "event was generated")
+}
+
+func TestSendPlaceholderLargerEvent(t *testing.T) {
+       app := &Application{
+               queuePath: "root.test",
+       }
+
+       // not enabled
+       evt := newApplicationEvents(app, nil)
+       assert.Assert(t, evt.eventSystem == nil, "event system should be nil")
+       assert.Assert(t, !evt.enabled, "event system should be disabled")
+       evt.sendPlaceholderLargerEvent(&Allocation{}, &AllocationAsk{})
+
+       // enabled
+       mock := newEventSystemMock()
+       evt = newApplicationEvents(app, mock)
+       assert.Assert(t, evt.eventSystem != nil, "event system should not be 
nil")
+       assert.Assert(t, evt.enabled, "event system should be enabled")
+       evt.sendPlaceholderLargerEvent(&Allocation{
+               allocationKey: aKey,
+       }, &AllocationAsk{
+               applicationID: appID0,
+               allocationKey: aKey,
+       })
+       assert.Equal(t, 1, len(mock.events), "event was not generated")
+
+       // enabled, ObjectID missing
+       mock = newEventSystemMock()
+       evt = newApplicationEvents(app, mock)
+       assert.Assert(t, evt.eventSystem != nil, "event system should not be 
nil")
+       assert.Assert(t, evt.enabled, "event system should be enabled")
+       evt.sendPlaceholderLargerEvent(&Allocation{}, &AllocationAsk{
+               applicationID: appID0,
+       })
+       assert.Equal(t, 0, len(mock.events), "event was generated")
+}
+
+func newEventSystemMock() *EventSystemMock {
+       return &EventSystemMock{events: make([]*si.EventRecord, 0)}
+}
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index 2a7f2895..ada485be 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -2508,9 +2508,9 @@ func TestAddTGAppDynamic(t *testing.T) {
 
 func TestPlaceholderSmallerThanReal(t *testing.T) {
        setupUGM()
-       events.CreateAndSetEventCache()
-       cache := events.GetEventCache()
-       cache.StartService()
+       events.CreateAndSetEventSystem()
+       eventSystem := events.GetEventSystem().(*events.EventSystemImpl) 
//nolint:errcheck
+       eventSystem.StartServiceWithPublisher(false)
 
        partition, err := newBasePartition()
        assert.NilError(t, err, "partition create failed")
@@ -2553,11 +2553,11 @@ func TestPlaceholderSmallerThanReal(t *testing.T) {
 
        // wait for events to be processed
        err = common.WaitFor(1*time.Millisecond, 10*time.Millisecond, func() 
bool {
-               return cache.Store.CountStoredEvents() == 1
+               return eventSystem.Store.CountStoredEvents() == 1
        })
        assert.NilError(t, err, "the event should have been processed")
 
-       records := cache.Store.CollectEvents()
+       records := eventSystem.Store.CollectEvents()
        if records == nil {
                t.Fatal("collecting eventChannel should return something")
        }
@@ -2587,9 +2587,9 @@ func TestPlaceholderSmallerThanReal(t *testing.T) {
 // one real allocation should trigger cleanup of all placeholders
 func TestPlaceholderSmallerMulti(t *testing.T) {
        setupUGM()
-       events.CreateAndSetEventCache()
-       cache := events.GetEventCache()
-       cache.StartService()
+       events.CreateAndSetEventSystem()
+       eventSystem := events.GetEventSystem().(*events.EventSystemImpl) 
//nolint:errcheck
+       eventSystem.StartServiceWithPublisher(false)
 
        partition, err := newBasePartition()
        assert.NilError(t, err, "partition create failed")
@@ -2639,12 +2639,12 @@ func TestPlaceholderSmallerMulti(t *testing.T) {
 
        // wait for events to be processed
        err = common.WaitFor(1*time.Millisecond, 10*time.Millisecond, func() 
bool {
-               fmt.Printf("checking event length: %d\n", 
cache.Store.CountStoredEvents())
-               return cache.Store.CountStoredEvents() == phCount
+               fmt.Printf("checking event length: %d\n", 
eventSystem.Store.CountStoredEvents())
+               return eventSystem.Store.CountStoredEvents() == phCount
        })
        assert.NilError(t, err, "the events should have been processed")
 
-       records := cache.Store.CollectEvents()
+       records := eventSystem.Store.CollectEvents()
        if records == nil {
                t.Fatal("collecting eventChannel should return something")
        }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to