This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new d2d7ce89 [YUNIKORN-1774] Event cache: misc cleanup (#559)
d2d7ce89 is described below
commit d2d7ce89457cbbce140e2737098ae62f6a9d8fee
Author: Peter Bacsko <[email protected]>
AuthorDate: Fri Jun 9 10:41:56 2023 +0200
[YUNIKORN-1774] Event cache: misc cleanup (#559)
Closes: #559
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/entrypoint/entrypoint.go | 23 ++--
pkg/events/event_cache_test.go | 164 -------------------------
pkg/events/event_publisher.go | 33 ++---
pkg/events/event_publisher_test.go | 14 ++-
pkg/events/event_store.go | 18 +--
pkg/events/event_store_test.go | 4 +-
pkg/events/{event_cache.go => event_system.go} | 57 +++++----
pkg/events/event_system_test.go | 90 ++++++++++++++
pkg/scheduler/objects/application.go | 26 +---
pkg/scheduler/objects/events.go | 72 +++++++++++
pkg/scheduler/objects/events_test.go | 111 +++++++++++++++++
pkg/scheduler/partition_test.go | 22 ++--
12 files changed, 357 insertions(+), 277 deletions(-)
diff --git a/pkg/entrypoint/entrypoint.go b/pkg/entrypoint/entrypoint.go
index 2fee2dc2..2a42a989 100644
--- a/pkg/entrypoint/entrypoint.go
+++ b/pkg/entrypoint/entrypoint.go
@@ -36,7 +36,7 @@ type startupOptions struct {
manualScheduleFlag bool
startWebAppFlag bool
metricsHistorySize int
- eventCacheEnabled bool
+ eventSystemEnabled bool
}
func StartAllServices() *ServiceContext {
@@ -46,7 +46,7 @@ func StartAllServices() *ServiceContext {
manualScheduleFlag: false,
startWebAppFlag: true,
metricsHistorySize: 1440,
- eventCacheEnabled: false,
+ eventSystemEnabled: false,
})
}
@@ -63,18 +63,15 @@ func StartAllServicesWithManualScheduler() *ServiceContext {
manualScheduleFlag: true,
startWebAppFlag: false,
metricsHistorySize: 0,
- eventCacheEnabled: false,
+ eventSystemEnabled: false,
})
}
func startAllServicesWithParameters(opts startupOptions) *ServiceContext {
- var eventCache *events.EventCache
- var eventPublisher events.EventPublisher
- if opts.eventCacheEnabled {
- log.Logger().Info("creating event cache")
- events.CreateAndSetEventCache()
- eventCache = events.GetEventCache()
- eventPublisher = events.CreateShimPublisher(eventCache.Store)
+ if opts.eventSystemEnabled {
+ log.Logger().Info("creating and starting event system")
+ events.CreateAndSetEventSystem()
+ events.GetEventSystem().StartService()
}
sched := scheduler.NewScheduler()
@@ -89,12 +86,6 @@ func startAllServicesWithParameters(opts startupOptions)
*ServiceContext {
log.Logger().Info("ServiceContext start scheduling services")
sched.StartService(eventHandler, opts.manualScheduleFlag)
proxy.StartService(eventHandler)
- if opts.eventCacheEnabled && eventCache != nil {
- eventCache.StartService()
- if eventPublisher != nil {
- eventPublisher.StartService()
- }
- }
context := &ServiceContext{
RMProxy: proxy,
diff --git a/pkg/events/event_cache_test.go b/pkg/events/event_cache_test.go
deleted file mode 100644
index f7a72754..00000000
--- a/pkg/events/event_cache_test.go
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-*/
-
-package events
-
-import (
- "sync"
- "testing"
- "time"
-
- "go.uber.org/zap"
-
- "gotest.tools/v3/assert"
-
- "github.com/apache/yunikorn-core/pkg/common"
- "github.com/apache/yunikorn-core/pkg/log"
- "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
-)
-
-// the EventCache should be nil by default, until not set by
CreateAndSetEventCache()
-func TestGetEventCache(t *testing.T) {
- cache := GetEventCache()
- assert.Assert(t, cache == nil, "the cache should be nil by default")
- CreateAndSetEventCache()
- cache = GetEventCache()
- assert.Assert(t, cache != nil, "the cache should not be nil")
-}
-
-// StartService() and Stop() must not cause panic
-func TestSimpleStartAndStop(t *testing.T) {
- CreateAndSetEventCache()
- cache := GetEventCache()
- // adding event to stopped cache does not cause panic
- cache.AddEvent(nil)
- cache.StartService()
- // add an event
- cache.AddEvent(nil)
- cache.Stop()
- // adding event to stopped cache does not cause panic
- cache.AddEvent(nil)
-}
-
-// if an EventRecord is added to the EventCache, the same record
-// should be retrieved from the EventStore
-func TestSingleEventStoredCorrectly(t *testing.T) {
- CreateAndSetEventCache()
- cache := GetEventCache()
- cache.StartService()
-
- event := si.EventRecord{
- Type: si.EventRecord_REQUEST,
- ObjectID: "alloc1",
- GroupID: "app1",
- Reason: "reason",
- Message: "message",
- }
- cache.AddEvent(&event)
-
- // wait for events to be processed
- err := common.WaitFor(1*time.Millisecond, 10*time.Millisecond, func()
bool {
- return cache.Store.CountStoredEvents() == 1
- })
- assert.NilError(t, err, "the event should have been processed")
-
- records := cache.Store.CollectEvents()
- if records == nil {
- t.Fatal("collecting eventChannel should return something")
- }
- assert.Equal(t, len(records), 1)
- record := records[0]
- assert.Equal(t, record.Type, si.EventRecord_REQUEST)
- assert.Equal(t, record.ObjectID, "alloc1")
- assert.Equal(t, record.GroupID, "app1")
- assert.Equal(t, record.Message, "message")
- assert.Equal(t, record.Reason, "reason")
-}
-
-type slowEventStore struct {
- busy bool
-
- sync.Mutex
-}
-
-func (ses *slowEventStore) setBusy(b bool) {
- ses.Lock()
- defer ses.Unlock()
-
- log.Logger().Info("setting busy to ", zap.Bool("bool", b))
-
- ses.busy = b
-}
-
-func (ses *slowEventStore) getBusy() bool {
- ses.Lock()
- defer ses.Unlock()
-
- log.Logger().Info("getting busy ", zap.Bool("bool", ses.busy))
-
- return ses.busy
-}
-
-func (ses *slowEventStore) Store(*si.EventRecord) {
- ses.setBusy(true)
- defer ses.setBusy(false)
-
- time.Sleep(50 * time.Millisecond)
-}
-
-func (ses *slowEventStore) CollectEvents() []*si.EventRecord {
- return nil
-}
-
-func (ses *slowEventStore) CountStoredEvents() int {
- return 0
-}
-
-// this test checks that if storing events is much slower
-// than the rate the events are generated, it doesn't cause
-// panic by filling up the EventChannel
-func TestSlowChannelFillingUpEventChannel(t *testing.T) {
- defaultEventChannelSize = 2
-
- store := slowEventStore{}
- cache := createEventCacheInternal(&store)
-
- cache.StartService()
-
- event := si.EventRecord{
- Type: si.EventRecord_REQUEST,
- ObjectID: "alloc1",
- GroupID: "app1",
- Reason: "reason",
- Message: "message",
- }
-
- cache.AddEvent(&event)
-
- // wait for events to be occupy the slow store
- err := common.WaitFor(1*time.Millisecond, 100*time.Millisecond,
store.getBusy)
- assert.NilError(t, err, "the event should have been processed")
- assert.Equal(t, len(cache.channel), 0, "the number of queued elements
should be zero")
-
- cache.AddEvent(&event)
- assert.Equal(t, len(cache.channel), 1, "the number of queued elements
should be two")
- cache.AddEvent(&event)
- assert.Equal(t, len(cache.channel), 2, "the number of queued elements
should be two")
- cache.AddEvent(&event)
- assert.Equal(t, len(cache.channel), 2, "the number of queued elements
should be two")
-}
diff --git a/pkg/events/event_publisher.go b/pkg/events/event_publisher.go
index ba1e45e2..6225a1ee 100644
--- a/pkg/events/event_publisher.go
+++ b/pkg/events/event_publisher.go
@@ -31,38 +31,25 @@ import (
// stores the push event internal
var defaultPushEventInterval = 2 * time.Second
-type EventPublisher interface {
- StartService()
- Stop()
-}
-
-type shimPublisher struct {
- store EventStore
+type EventPublisher struct {
+ store *EventStore
pushEventInterval time.Duration
- stop atomic.Value
-}
-
-func CreateShimPublisher(store EventStore) EventPublisher {
- return createShimPublisherInternal(store)
-}
-
-func createShimPublisherInternal(store EventStore) *shimPublisher {
- return createShimPublisherWithParameters(store,
defaultPushEventInterval)
+ stop atomic.Bool
}
-func createShimPublisherWithParameters(store EventStore, pushEventInterval
time.Duration) *shimPublisher {
- publisher := &shimPublisher{
+func CreateShimPublisher(store *EventStore) *EventPublisher {
+ publisher := &EventPublisher{
store: store,
- pushEventInterval: pushEventInterval,
+ pushEventInterval: defaultPushEventInterval,
}
publisher.stop.Store(false)
return publisher
}
-func (sp *shimPublisher) StartService() {
+func (sp *EventPublisher) StartService() {
go func() {
for {
- if sp.stop.Load().(bool) {
+ if sp.stop.Load() {
break
}
messages := sp.store.CollectEvents()
@@ -77,10 +64,10 @@ func (sp *shimPublisher) StartService() {
}()
}
-func (sp *shimPublisher) Stop() {
+func (sp *EventPublisher) Stop() {
sp.stop.Store(true)
}
-func (sp *shimPublisher) getEventStore() EventStore {
+func (sp *EventPublisher) getEventStore() *EventStore {
return sp.store
}
diff --git a/pkg/events/event_publisher_test.go
b/pkg/events/event_publisher_test.go
index 85d87cb4..b9d61e97 100644
--- a/pkg/events/event_publisher_test.go
+++ b/pkg/events/event_publisher_test.go
@@ -79,8 +79,8 @@ func TestCreateShimPublisher(t *testing.T) {
// StartService() and Stop() functions should not cause panic
func TestServiceStartStopInternal(t *testing.T) {
- store := newEventStoreImpl()
- publisher := createShimPublisherInternal(store)
+ store := newEventStore()
+ publisher := CreateShimPublisher(store)
publisher.StartService()
defer publisher.Stop()
assert.Equal(t, publisher.getEventStore(), store)
@@ -89,8 +89,9 @@ func TestServiceStartStopInternal(t *testing.T) {
func TestNoFillWithoutEventPluginRegistered(t *testing.T) {
pushEventInterval := 2 * time.Millisecond
- store := newEventStoreImpl()
- publisher := createShimPublisherWithParameters(store, pushEventInterval)
+ store := newEventStore()
+ publisher := CreateShimPublisher(store)
+ publisher.pushEventInterval = pushEventInterval
publisher.StartService()
defer publisher.Stop()
@@ -116,8 +117,9 @@ func TestPublisherSendsEvent(t *testing.T) {
eventPlugin, err := createEventPluginForTest()
assert.NilError(t, err, "could not create event plugin for test")
- store := newEventStoreImpl()
- publisher := createShimPublisherWithParameters(store, pushEventInterval)
+ store := newEventStore()
+ publisher := CreateShimPublisher(store)
+ publisher.pushEventInterval = pushEventInterval
publisher.StartService()
defer publisher.Stop()
diff --git a/pkg/events/event_store.go b/pkg/events/event_store.go
index 8625a641..cd2cc603 100644
--- a/pkg/events/event_store.go
+++ b/pkg/events/event_store.go
@@ -33,25 +33,19 @@ var defaultEventStoreSize = 1000
//
// Assuming the rate of events generated by the scheduler component in a given
time period
// is high, calling CollectEvents() periodically should be fine.
-type EventStore interface {
- Store(event *si.EventRecord)
- CollectEvents() []*si.EventRecord
- CountStoredEvents() int
-}
-
-type defaultEventStore struct {
+type EventStore struct {
events []*si.EventRecord
idx int // points where to store the next event
sync.RWMutex
}
-func newEventStoreImpl() EventStore {
- return &defaultEventStore{
+func newEventStore() *EventStore {
+ return &EventStore{
events: make([]*si.EventRecord, defaultEventStoreSize),
}
}
-func (es *defaultEventStore) Store(event *si.EventRecord) {
+func (es *EventStore) Store(event *si.EventRecord) {
es.Lock()
defer es.Unlock()
@@ -65,7 +59,7 @@ func (es *defaultEventStore) Store(event *si.EventRecord) {
metrics.GetEventMetrics().IncEventsStored()
}
-func (es *defaultEventStore) CollectEvents() []*si.EventRecord {
+func (es *EventStore) CollectEvents() []*si.EventRecord {
es.Lock()
defer es.Unlock()
@@ -77,7 +71,7 @@ func (es *defaultEventStore) CollectEvents()
[]*si.EventRecord {
return messages
}
-func (es *defaultEventStore) CountStoredEvents() int {
+func (es *EventStore) CountStoredEvents() int {
es.RLock()
defer es.RUnlock()
diff --git a/pkg/events/event_store_test.go b/pkg/events/event_store_test.go
index 5eb00bb4..93b46f30 100644
--- a/pkg/events/event_store_test.go
+++ b/pkg/events/event_store_test.go
@@ -29,7 +29,7 @@ import (
// the fields of an event should match after stored and retrieved
func TestStoreAndRetrieve(t *testing.T) {
- store := newEventStoreImpl()
+ store := newEventStore()
event := si.EventRecord{
Type: si.EventRecord_REQUEST,
ObjectID: "alloc1",
@@ -58,7 +58,7 @@ func TestStoreAndRetrieve(t *testing.T) {
func TestStoreWithLimitedSize(t *testing.T) {
defaultEventStoreSize = 3
- store := newEventStoreImpl()
+ store := newEventStore()
for i := 0; i < 5; i++ {
event := &si.EventRecord{
Type: si.EventRecord_REQUEST,
diff --git a/pkg/events/event_cache.go b/pkg/events/event_system.go
similarity index 66%
rename from pkg/events/event_cache.go
rename to pkg/events/event_system.go
index baf5fd2e..304e0dd9 100644
--- a/pkg/events/event_cache.go
+++ b/pkg/events/event_system.go
@@ -29,39 +29,45 @@ import (
// need to change for testing
var defaultEventChannelSize = 100000
-var cache *EventCache
+var ev EventSystem
-type EventCache struct {
- Store EventStore // storing eventChannel
+type EventSystem interface {
+ AddEvent(event *si.EventRecord)
+ StartService()
+ Stop()
+}
+
+type EventSystemImpl struct {
+ Store *EventStore // storing eventChannel
+ publisher *EventPublisher
channel chan *si.EventRecord // channelling input eventChannel
stop chan bool // whether the service is stop
+ stopped bool
sync.Mutex
}
-func GetEventCache() *EventCache {
- return cache
+func GetEventSystem() EventSystem {
+ return ev
}
-func CreateAndSetEventCache() {
- cache = createEventStoreAndCache()
+func CreateAndSetEventSystem() {
+ store := newEventStore()
+ ev = &EventSystemImpl{
+ Store: store,
+ channel: make(chan *si.EventRecord, defaultEventChannelSize),
+ stop: make(chan bool),
+ stopped: false,
+ publisher: CreateShimPublisher(store),
+ }
}
-func createEventStoreAndCache() *EventCache {
- store := newEventStoreImpl()
- return createEventCacheInternal(store)
+func (ec *EventSystemImpl) StartService() {
+ ec.StartServiceWithPublisher(true)
}
-func createEventCacheInternal(store EventStore) *EventCache {
- return &EventCache{
- Store: store,
- channel: make(chan *si.EventRecord, defaultEventChannelSize),
- stop: make(chan bool),
- }
-}
-
-func (ec *EventCache) StartService() {
+func (ec *EventSystemImpl) StartServiceWithPublisher(withPublisher bool) {
go func() {
for {
select {
@@ -78,20 +84,29 @@ func (ec *EventCache) StartService() {
}
}
}()
+ if withPublisher {
+ ec.publisher.StartService()
+ }
}
-func (ec *EventCache) Stop() {
+func (ec *EventSystemImpl) Stop() {
ec.Lock()
defer ec.Unlock()
+ if ec.stopped {
+ return
+ }
+
ec.stop <- true
if ec.channel != nil {
close(ec.channel)
ec.channel = nil
}
+ ec.publisher.Stop()
+ ec.stopped = true
}
-func (ec *EventCache) AddEvent(event *si.EventRecord) {
+func (ec *EventSystemImpl) AddEvent(event *si.EventRecord) {
metrics.GetEventMetrics().IncEventsCreated()
select {
case ec.channel <- event:
diff --git a/pkg/events/event_system_test.go b/pkg/events/event_system_test.go
new file mode 100644
index 00000000..19dab280
--- /dev/null
+++ b/pkg/events/event_system_test.go
@@ -0,0 +1,90 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+ "testing"
+ "time"
+
+ "gotest.tools/v3/assert"
+
+ "github.com/apache/yunikorn-core/pkg/common"
+ "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+// the EventSystem should be nil by default, until not set by
CreateAndSetEventSystem()
+func TestGetEventSystem(t *testing.T) {
+ eventSystem := GetEventSystem()
+ assert.Assert(t, eventSystem == nil, "the eventSystem should be nil by
default")
+ CreateAndSetEventSystem()
+ eventSystem = GetEventSystem()
+ assert.Assert(t, eventSystem != nil, "the eventSystem should not be
nil")
+}
+
+// StartService() and Stop() must not cause panic
+func TestSimpleStartAndStop(t *testing.T) {
+ CreateAndSetEventSystem()
+ eventSystem := GetEventSystem()
+ // adding event to stopped eventSystem does not cause panic
+ eventSystem.AddEvent(nil)
+ eventSystem.StartService()
+ defer eventSystem.Stop()
+ // add an event
+ eventSystem.AddEvent(nil)
+ eventSystem.Stop()
+ // adding event to stopped eventSystem does not cause panic
+ eventSystem.AddEvent(nil)
+}
+
+// if an EventRecord is added to the EventSystem, the same record
+// should be retrieved from the EventStore
+func TestSingleEventStoredCorrectly(t *testing.T) {
+ CreateAndSetEventSystem()
+ eventSystem := GetEventSystem().(*EventSystemImpl) //nolint:errcheck
+ // don't run publisher, because it can collect the event while we're
waiting
+ eventSystem.StartServiceWithPublisher(false)
+ defer eventSystem.Stop()
+
+ event := si.EventRecord{
+ Type: si.EventRecord_REQUEST,
+ ObjectID: "alloc1",
+ GroupID: "app1",
+ Reason: "reason",
+ Message: "message",
+ }
+ eventSystem.AddEvent(&event)
+
+ // wait for events to be processed
+ err := common.WaitFor(time.Millisecond, time.Second, func() bool {
+ return eventSystem.Store.CountStoredEvents() == 1
+ })
+ assert.NilError(t, err, "the event should have been processed")
+
+ records := eventSystem.Store.CollectEvents()
+ if records == nil {
+ t.Fatal("collecting eventChannel should return something")
+ }
+ assert.Equal(t, len(records), 1)
+ record := records[0]
+ assert.Equal(t, record.Type, si.EventRecord_REQUEST)
+ assert.Equal(t, record.ObjectID, "alloc1")
+ assert.Equal(t, record.GroupID, "app1")
+ assert.Equal(t, record.Message, "message")
+ assert.Equal(t, record.Reason, "reason")
+}
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index 6fcbe4d2..9d061565 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -107,6 +107,7 @@ type Application struct {
rmEventHandler handler.EventHandler
rmID string
terminatedCallback func(appID string)
+ appEvents *applicationEvents
sync.RWMutex
}
@@ -196,6 +197,7 @@ func NewApplication(siApp *si.AddApplicationRequest, ugi
security.UserGroup, eve
app.user = ugi
app.rmEventHandler = eventHandler
app.rmID = rmID
+ app.appEvents = newApplicationEvents(app, events.GetEventSystem())
return app
}
@@ -960,17 +962,7 @@ func (sa *Application) tryAllocate(headRoom
*resources.Resource, preemptionDelay
}
}
- // post scheduling events via the event plugin
- if eventCache := events.GetEventCache(); eventCache !=
nil {
- message := fmt.Sprintf("Application %s does not
fit into %s queue", request.GetApplicationID(), sa.queuePath)
- if event, err :=
events.CreateRequestEventRecord(request.GetAllocationKey(),
request.GetApplicationID(), "InsufficientQueueResources", message); err != nil {
- log.Logger().Warn("Event creation
failed",
- zap.String("event message",
message),
- zap.Error(err))
- } else {
- eventCache.AddEvent(event)
- }
- }
+ sa.appEvents.sendAppDoesNotFitEvent(request)
continue
}
@@ -1126,17 +1118,7 @@ func (sa *Application)
tryPlaceholderAllocate(nodeIterator func() NodeIterator,
// release the placeholder and tell the RM
ph.SetReleased(true)
sa.notifyRMAllocationReleased(sa.rmID,
[]*Allocation{ph}, si.TerminationType_TIMEOUT, "cancel placeholder: resource
incompatible")
- // add an event on the app to show the release
- if eventCache := events.GetEventCache();
eventCache != nil {
- message := fmt.Sprintf("Task group '%s'
in application '%s': allocation resources '%s' are not matching placeholder
'%s' allocation with ID '%s'", ph.GetTaskGroup(), sa.ApplicationID,
request.GetAllocatedResource().String(), ph.GetAllocatedResource().String(),
ph.GetAllocationKey())
- if event, err :=
events.CreateRequestEventRecord(ph.GetAllocationKey(), sa.ApplicationID,
"releasing placeholder: real allocation is larger than placeholder", message);
err != nil {
- log.Logger().Warn("Event
creation failed",
- zap.String("event
message", message),
- zap.Error(err))
- } else {
- eventCache.AddEvent(event)
- }
- }
+ sa.appEvents.sendPlaceholderLargerEvent(ph,
request)
continue
}
// placeholder is the same or larger continue
processing and difference is handled when the placeholder
diff --git a/pkg/scheduler/objects/events.go b/pkg/scheduler/objects/events.go
new file mode 100644
index 00000000..16226d9e
--- /dev/null
+++ b/pkg/scheduler/objects/events.go
@@ -0,0 +1,72 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+ "fmt"
+
+ "go.uber.org/zap"
+
+ "github.com/apache/yunikorn-core/pkg/events"
+ "github.com/apache/yunikorn-core/pkg/log"
+)
+
+type applicationEvents struct {
+ enabled bool
+ eventSystem events.EventSystem
+ app *Application
+}
+
+func (evt *applicationEvents) sendAppDoesNotFitEvent(request *AllocationAsk) {
+ if !evt.enabled {
+ return
+ }
+
+ message := fmt.Sprintf("Application %s does not fit into %s queue",
request.GetApplicationID(), evt.app.queuePath)
+ if event, err :=
events.CreateRequestEventRecord(request.GetAllocationKey(),
request.GetApplicationID(), "InsufficientQueueResources", message); err != nil {
+ log.Logger().Warn("Event creation failed",
+ zap.String("event message", message),
+ zap.Error(err))
+ } else {
+ evt.eventSystem.AddEvent(event)
+ }
+}
+
+func (evt *applicationEvents) sendPlaceholderLargerEvent(ph *Allocation,
request *AllocationAsk) {
+ if !evt.enabled {
+ return
+ }
+
+ message := fmt.Sprintf("Task group '%s' in application '%s': allocation
resources '%s' are not matching placeholder '%s' allocation with ID '%s'",
ph.GetTaskGroup(), evt.app.ApplicationID,
request.GetAllocatedResource().String(), ph.GetAllocatedResource().String(),
ph.GetAllocationKey())
+ if event, err := events.CreateRequestEventRecord(ph.GetAllocationKey(),
evt.app.ApplicationID, "releasing placeholder: real allocation is larger than
placeholder", message); err != nil {
+ log.Logger().Warn("Event creation failed",
+ zap.String("event message", message),
+ zap.Error(err))
+ } else {
+ evt.eventSystem.AddEvent(event)
+ }
+}
+
+func newApplicationEvents(app *Application, evt events.EventSystem)
*applicationEvents {
+ return &applicationEvents{
+ eventSystem: evt,
+ enabled: evt != nil,
+ app: app,
+ }
+}
diff --git a/pkg/scheduler/objects/events_test.go
b/pkg/scheduler/objects/events_test.go
new file mode 100644
index 00000000..688c1666
--- /dev/null
+++ b/pkg/scheduler/objects/events_test.go
@@ -0,0 +1,111 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+ "testing"
+
+ "gotest.tools/v3/assert"
+
+ "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+type EventSystemMock struct {
+ events []*si.EventRecord
+}
+
+func (m *EventSystemMock) AddEvent(event *si.EventRecord) {
+ m.events = append(m.events, event)
+}
+
+func (m *EventSystemMock) StartService() {}
+
+func (m *EventSystemMock) Stop() {}
+
+func TestSendAppDoesNotFitEvent(t *testing.T) {
+ app := &Application{
+ queuePath: "root.test",
+ }
+
+ // not enabled
+ evt := newApplicationEvents(app, nil)
+ assert.Assert(t, evt.eventSystem == nil, "event system should be nil")
+ assert.Assert(t, !evt.enabled, "event system should be disabled")
+ evt.sendAppDoesNotFitEvent(&AllocationAsk{})
+
+ // enabled
+ mock := newEventSystemMock()
+ evt = newApplicationEvents(app, mock)
+ assert.Assert(t, evt.eventSystem != nil, "event system should not be
nil")
+ assert.Assert(t, evt.enabled, "event system should be enabled")
+ evt.sendAppDoesNotFitEvent(&AllocationAsk{
+ applicationID: appID0,
+ allocationKey: aKey,
+ })
+ assert.Equal(t, 1, len(mock.events), "event was not generated")
+
+ // enabled, ObjectID missing
+ mock = newEventSystemMock()
+ evt = newApplicationEvents(app, mock)
+ assert.Assert(t, evt.eventSystem != nil, "event system should not be
nil")
+ assert.Assert(t, evt.enabled, "event system should be enabled")
+ evt.sendAppDoesNotFitEvent(&AllocationAsk{
+ applicationID: appID0,
+ })
+ assert.Equal(t, 0, len(mock.events), "event was generated")
+}
+
+func TestSendPlaceholderLargerEvent(t *testing.T) {
+ app := &Application{
+ queuePath: "root.test",
+ }
+
+ // not enabled
+ evt := newApplicationEvents(app, nil)
+ assert.Assert(t, evt.eventSystem == nil, "event system should be nil")
+ assert.Assert(t, !evt.enabled, "event system should be disabled")
+ evt.sendPlaceholderLargerEvent(&Allocation{}, &AllocationAsk{})
+
+ // enabled
+ mock := newEventSystemMock()
+ evt = newApplicationEvents(app, mock)
+ assert.Assert(t, evt.eventSystem != nil, "event system should not be
nil")
+ assert.Assert(t, evt.enabled, "event system should be enabled")
+ evt.sendPlaceholderLargerEvent(&Allocation{
+ allocationKey: aKey,
+ }, &AllocationAsk{
+ applicationID: appID0,
+ allocationKey: aKey,
+ })
+ assert.Equal(t, 1, len(mock.events), "event was not generated")
+
+ // enabled, ObjectID missing
+ mock = newEventSystemMock()
+ evt = newApplicationEvents(app, mock)
+ assert.Assert(t, evt.eventSystem != nil, "event system should not be
nil")
+ assert.Assert(t, evt.enabled, "event system should be enabled")
+ evt.sendPlaceholderLargerEvent(&Allocation{}, &AllocationAsk{
+ applicationID: appID0,
+ })
+ assert.Equal(t, 0, len(mock.events), "event was generated")
+}
+
+func newEventSystemMock() *EventSystemMock {
+ return &EventSystemMock{events: make([]*si.EventRecord, 0)}
+}
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index 2a7f2895..ada485be 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -2508,9 +2508,9 @@ func TestAddTGAppDynamic(t *testing.T) {
func TestPlaceholderSmallerThanReal(t *testing.T) {
setupUGM()
- events.CreateAndSetEventCache()
- cache := events.GetEventCache()
- cache.StartService()
+ events.CreateAndSetEventSystem()
+ eventSystem := events.GetEventSystem().(*events.EventSystemImpl)
//nolint:errcheck
+ eventSystem.StartServiceWithPublisher(false)
partition, err := newBasePartition()
assert.NilError(t, err, "partition create failed")
@@ -2553,11 +2553,11 @@ func TestPlaceholderSmallerThanReal(t *testing.T) {
// wait for events to be processed
err = common.WaitFor(1*time.Millisecond, 10*time.Millisecond, func()
bool {
- return cache.Store.CountStoredEvents() == 1
+ return eventSystem.Store.CountStoredEvents() == 1
})
assert.NilError(t, err, "the event should have been processed")
- records := cache.Store.CollectEvents()
+ records := eventSystem.Store.CollectEvents()
if records == nil {
t.Fatal("collecting eventChannel should return something")
}
@@ -2587,9 +2587,9 @@ func TestPlaceholderSmallerThanReal(t *testing.T) {
// one real allocation should trigger cleanup of all placeholders
func TestPlaceholderSmallerMulti(t *testing.T) {
setupUGM()
- events.CreateAndSetEventCache()
- cache := events.GetEventCache()
- cache.StartService()
+ events.CreateAndSetEventSystem()
+ eventSystem := events.GetEventSystem().(*events.EventSystemImpl)
//nolint:errcheck
+ eventSystem.StartServiceWithPublisher(false)
partition, err := newBasePartition()
assert.NilError(t, err, "partition create failed")
@@ -2639,12 +2639,12 @@ func TestPlaceholderSmallerMulti(t *testing.T) {
// wait for events to be processed
err = common.WaitFor(1*time.Millisecond, 10*time.Millisecond, func()
bool {
- fmt.Printf("checking event length: %d\n",
cache.Store.CountStoredEvents())
- return cache.Store.CountStoredEvents() == phCount
+ fmt.Printf("checking event length: %d\n",
eventSystem.Store.CountStoredEvents())
+ return eventSystem.Store.CountStoredEvents() == phCount
})
assert.NilError(t, err, "the events should have been processed")
- records := cache.Store.CollectEvents()
+ records := eventSystem.Store.CollectEvents()
if records == nil {
t.Fatal("collecting eventChannel should return something")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]