This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 7b807cf1 [YUNIKORN-2513] Fix event system to use event.requestCapacity
(#833)
7b807cf1 is described below
commit 7b807cf10d02f42e2e6d312391d6e2c6ddf2e607
Author: Peter Bacsko <[email protected]>
AuthorDate: Sun Apr 7 02:48:57 2024 +0200
[YUNIKORN-2513] Fix event system to use event.requestCapacity (#833)
Closes: #833
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/events/event_publisher_test.go | 6 ++--
pkg/events/event_store.go | 34 +++++++++++-------
pkg/events/event_store_test.go | 46 ++++++++++++++++++++----
pkg/events/event_system.go | 60 +++++++++++++++++++------------
pkg/events/event_system_test.go | 54 ++++++++++++++++++++++++++--
pkg/scheduler/objects/application_test.go | 8 ++---
pkg/scheduler/objects/queue_test.go | 2 +-
pkg/scheduler/partition_test.go | 2 +-
pkg/webservice/handlers_test.go | 2 +-
9 files changed, 159 insertions(+), 55 deletions(-)
diff --git a/pkg/events/event_publisher_test.go
b/pkg/events/event_publisher_test.go
index d83ffae8..79fc95fc 100644
--- a/pkg/events/event_publisher_test.go
+++ b/pkg/events/event_publisher_test.go
@@ -38,7 +38,7 @@ func TestCreateShimPublisher(t *testing.T) {
// StartService() and Stop() functions should not cause panic
func TestServiceStartStopInternal(t *testing.T) {
- store := newEventStore()
+ store := newEventStore(1000)
publisher := CreateShimPublisher(store)
publisher.StartService()
defer publisher.Stop()
@@ -46,7 +46,7 @@ func TestServiceStartStopInternal(t *testing.T) {
}
func TestNoFillWithoutEventPluginRegistered(t *testing.T) {
- store := newEventStore()
+ store := newEventStore(1000)
publisher := CreateShimPublisher(store)
publisher.pushEventInterval = time.Millisecond
publisher.StartService()
@@ -76,7 +76,7 @@ func TestPublisherSendsEvent(t *testing.T) {
t.Fatal("could not register event plugin for test")
}
- store := newEventStore()
+ store := newEventStore(1000)
publisher := CreateShimPublisher(store)
publisher.pushEventInterval = time.Millisecond
publisher.StartService()
diff --git a/pkg/events/event_store.go b/pkg/events/event_store.go
index ff27079b..b9cb38f0 100644
--- a/pkg/events/event_store.go
+++ b/pkg/events/event_store.go
@@ -19,13 +19,14 @@
package events
import (
+ "go.uber.org/zap"
+
"github.com/apache/yunikorn-core/pkg/locking"
+ "github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/metrics"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
-const defaultEventStoreSize = 1000
-
// The EventStore operates under the following assumptions:
// - there is a cap for the number of events stored
// - the CollectEvents() function clears the currently stored events in the
EventStore
@@ -33,18 +34,17 @@ const defaultEventStoreSize = 1000
// Assuming the rate of events generated by the scheduler component in a given
time period
// is high, calling CollectEvents() periodically should be fine.
type EventStore struct {
- events []*si.EventRecord
- idx int // points where to store the next event
+ events []*si.EventRecord
+ idx uint64 // points where to store the next event
+ size uint64
+ lastSize uint64
locking.RWMutex
}
-func newEventStore() *EventStore {
- return newEventStoreWithSize(defaultEventStoreSize)
-}
-
-func newEventStoreWithSize(size int) *EventStore {
+func newEventStore(size uint64) *EventStore {
return &EventStore{
events: make([]*si.EventRecord, size),
+ size: size,
}
}
@@ -52,7 +52,7 @@ func (es *EventStore) Store(event *si.EventRecord) {
es.Lock()
defer es.Unlock()
- if es.idx == len(es.events) {
+ if es.idx == uint64(len(es.events)) {
metrics.GetEventMetrics().IncEventsNotStored()
return
}
@@ -67,16 +67,26 @@ func (es *EventStore) CollectEvents() []*si.EventRecord {
defer es.Unlock()
messages := es.events[:es.idx]
- es.events = make([]*si.EventRecord, defaultEventStoreSize)
+ if es.size != es.lastSize {
+ log.Log(log.Events).Info("Resizing event store",
zap.Uint64("last", es.lastSize), zap.Uint64("new", es.size))
+ es.events = make([]*si.EventRecord, es.size)
+ }
es.idx = 0
+ es.lastSize = es.size
metrics.GetEventMetrics().AddEventsCollected(len(messages))
return messages
}
-func (es *EventStore) CountStoredEvents() int {
+func (es *EventStore) CountStoredEvents() uint64 {
es.RLock()
defer es.RUnlock()
return es.idx
}
+
+func (es *EventStore) SetStoreSize(size uint64) {
+ es.Lock()
+ defer es.Unlock()
+ es.size = size
+}
diff --git a/pkg/events/event_store_test.go b/pkg/events/event_store_test.go
index 7bb2c17c..c9abbeca 100644
--- a/pkg/events/event_store_test.go
+++ b/pkg/events/event_store_test.go
@@ -30,8 +30,8 @@ import (
// the fields of an event should match after stored and retrieved
func TestStoreAndRetrieve(t *testing.T) {
- store := newEventStore()
- event := &si.EventRecord{
+ store := newEventStore(1000)
+ event1 := &si.EventRecord{
Type: si.EventRecord_REQUEST,
EventChangeDetail: si.EventRecord_DETAILS_NONE,
EventChangeType: si.EventRecord_NONE,
@@ -39,12 +39,21 @@ func TestStoreAndRetrieve(t *testing.T) {
ReferenceID: "app1",
Message: "message",
}
- store.Store(event)
+ event2 := &si.EventRecord{
+ Type: si.EventRecord_REQUEST,
+ EventChangeDetail: si.EventRecord_DETAILS_NONE,
+ EventChangeType: si.EventRecord_NONE,
+ ObjectID: "alloc2",
+ ReferenceID: "app1",
+ Message: "message",
+ }
+ store.Store(event1)
+ store.Store(event2)
records := store.CollectEvents()
- assert.Equal(t, len(records), 1)
- record := records[0]
- assert.DeepEqual(t, record, event,
cmpopts.IgnoreUnexported(si.EventRecord{}))
+ assert.Equal(t, len(records), 2)
+ assert.DeepEqual(t, records[0], event1,
cmpopts.IgnoreUnexported(si.EventRecord{}))
+ assert.DeepEqual(t, records[1], event2,
cmpopts.IgnoreUnexported(si.EventRecord{}))
// calling CollectEvents erases the eventChannel map
records = store.CollectEvents()
@@ -54,7 +63,7 @@ func TestStoreAndRetrieve(t *testing.T) {
// if we push more events to the EventStore than its
// allowed maximum, those that couldn't fit will be omitted
func TestStoreWithLimitedSize(t *testing.T) {
- store := newEventStoreWithSize(3)
+ store := newEventStore(3)
for i := 0; i < 5; i++ {
event := &si.EventRecord{
@@ -68,3 +77,26 @@ func TestStoreWithLimitedSize(t *testing.T) {
records := store.CollectEvents()
assert.Equal(t, len(records), 3)
}
+
+func TestSetStoreSize(t *testing.T) {
+ store := newEventStore(5)
+ // store 5 events
+ for i := 0; i < 5; i++ {
+ store.Store(&si.EventRecord{
+ Type: si.EventRecord_REQUEST,
+ EventChangeDetail: si.EventRecord_DETAILS_NONE,
+ EventChangeType: si.EventRecord_NONE,
+ ObjectID: "alloc" + strconv.Itoa(i),
+ ReferenceID: "app1",
+ })
+ }
+
+ // set smaller size
+ store.SetStoreSize(3)
+ assert.Equal(t, uint64(3), store.size)
+
+ // collect events & make sure events are not lost
+ events := store.CollectEvents()
+ assert.Equal(t, 5, len(events))
+ assert.Equal(t, 3, len(store.events))
+}
diff --git a/pkg/events/event_system.go b/pkg/events/event_system.go
index 86fbd8d2..2702d57c 100644
--- a/pkg/events/event_system.go
+++ b/pkg/events/event_system.go
@@ -23,6 +23,8 @@ import (
"sync"
"time"
+ "go.uber.org/zap"
+
"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/locking"
@@ -92,7 +94,7 @@ type EventSystemImpl struct {
stopped bool
trackingEnabled bool
- requestCapacity int
+ requestCapacity uint64
ringBufferCapacity uint64
locking.RWMutex
@@ -129,7 +131,7 @@ func (ec *EventSystemImpl) IsEventTrackingEnabled() bool {
}
// GetRequestCapacity returns the capacity of an intermediate storage which is
used by the shim publisher.
-func (ec *EventSystemImpl) GetRequestCapacity() int {
+func (ec *EventSystemImpl) GetRequestCapacity() uint64 {
ec.RLock()
defer ec.RUnlock()
return ec.requestCapacity
@@ -145,16 +147,16 @@ func (ec *EventSystemImpl) GetRingBufferCapacity() uint64
{
// Init Initializes the event system.
// Only exported for testing.
func Init() {
- store := newEventStore()
- buffer := newEventRingBuffer(defaultRingBufferSize)
+ store := newEventStore(getRequestCapacity())
+ buffer := newEventRingBuffer(getRingBufferCapacity())
ev = &EventSystemImpl{
Store: store,
channel: make(chan *si.EventRecord,
defaultEventChannelSize),
stop: make(chan bool),
stopped: false,
- publisher: CreateShimPublisher(store),
eventBuffer: buffer,
eventSystemId: fmt.Sprintf("event-system-%d",
time.Now().Unix()),
+ publisher: CreateShimPublisher(store),
streaming: NewEventStreaming(buffer),
}
}
@@ -174,9 +176,9 @@ func (ec *EventSystemImpl)
StartServiceWithPublisher(withPublisher bool) {
go ec.reloadConfig()
})
- ec.trackingEnabled = ec.readIsTrackingEnabled()
- ec.ringBufferCapacity = ec.readRingBufferCapacity()
- ec.requestCapacity = ec.readRequestCapacity()
+ ec.trackingEnabled = isTrackingEnabled()
+ ec.ringBufferCapacity = getRingBufferCapacity()
+ ec.requestCapacity = getRequestCapacity()
go func() {
log.Log(log.Events).Info("Starting event system handler")
@@ -234,22 +236,36 @@ func (ec *EventSystemImpl) AddEvent(event
*si.EventRecord) {
}
}
-func (ec *EventSystemImpl) readIsTrackingEnabled() bool {
+func isTrackingEnabled() bool {
return common.GetConfigurationBool(configs.GetConfigMap(),
configs.CMEventTrackingEnabled, configs.DefaultEventTrackingEnabled)
}
-func (ec *EventSystemImpl) readRequestCapacity() int {
- return common.GetConfigurationInt(configs.GetConfigMap(),
configs.CMEventRequestCapacity, configs.DefaultEventRequestCapacity)
+func getRequestCapacity() uint64 {
+ capacity := common.GetConfigurationUint(configs.GetConfigMap(),
configs.CMEventRequestCapacity, configs.DefaultEventRequestCapacity)
+ if capacity == 0 {
+ log.Log(log.Events).Warn("Request capacity is set to 0, using
default",
+ zap.String("property", configs.CMEventRequestCapacity),
+ zap.Uint64("default",
configs.DefaultEventRequestCapacity))
+ return configs.DefaultEventRequestCapacity
+ }
+ return capacity
}
-func (ec *EventSystemImpl) readRingBufferCapacity() uint64 {
- return common.GetConfigurationUint(configs.GetConfigMap(),
configs.CMEventRingBufferCapacity, configs.DefaultEventRingBufferCapacity)
+func getRingBufferCapacity() uint64 {
+ capacity := common.GetConfigurationUint(configs.GetConfigMap(),
configs.CMEventRingBufferCapacity, configs.DefaultEventRingBufferCapacity)
+ if capacity == 0 {
+ log.Log(log.Events).Warn("Ring buffer capacity is set to 0,
using default",
+ zap.String("property",
configs.CMEventRingBufferCapacity),
+ zap.Uint64("default",
configs.DefaultEventRingBufferCapacity))
+ return configs.DefaultEventRingBufferCapacity
+ }
+ return capacity
}
func (ec *EventSystemImpl) isRestartNeeded() bool {
ec.RLock()
defer ec.RUnlock()
- return ec.readIsTrackingEnabled() != ec.trackingEnabled
+ return isTrackingEnabled() != ec.trackingEnabled
}
// Restart restarts the event system, used during config update.
@@ -273,18 +289,16 @@ func (ec *EventSystemImpl) CloseAllStreams() {
}
func (ec *EventSystemImpl) reloadConfig() {
- ec.updateRequestCapacity()
+ ec.Lock()
+ ec.requestCapacity = getRequestCapacity()
+ ec.ringBufferCapacity = getRingBufferCapacity()
+ ec.Unlock()
- // resize the ring buffer with new capacity
- ec.eventBuffer.Resize(ec.readRingBufferCapacity())
+ // resize the ring buffer & event store with new capacity
+ ec.Store.SetStoreSize(ec.requestCapacity)
+ ec.eventBuffer.Resize(ec.ringBufferCapacity)
if ec.isRestartNeeded() {
ec.Restart()
}
}
-
-func (ec *EventSystemImpl) updateRequestCapacity() {
- ec.Lock()
- defer ec.Unlock()
- ec.requestCapacity = ec.readRequestCapacity()
-}
diff --git a/pkg/events/event_system_test.go b/pkg/events/event_system_test.go
index f00961e2..5d2f9d05 100644
--- a/pkg/events/event_system_test.go
+++ b/pkg/events/event_system_test.go
@@ -120,17 +120,17 @@ func TestConfigUpdate(t *testing.T) {
assert.Assert(t, eventSystem.IsEventTrackingEnabled())
assert.Equal(t, eventSystem.GetRingBufferCapacity(),
uint64(configs.DefaultEventRingBufferCapacity))
- assert.Equal(t, eventSystem.GetRequestCapacity(),
configs.DefaultEventRequestCapacity)
+ assert.Equal(t, eventSystem.GetRequestCapacity(),
uint64(configs.DefaultEventRequestCapacity))
assert.Equal(t, eventSystem.eventBuffer.capacity,
uint64(configs.DefaultEventRingBufferCapacity))
// update config and wait for refresh
var newRingBufferCapacity uint64 = 123
- newRequestCapacity := 555
+ newRequestCapacity := uint64(555)
configs.SetConfigMap(
map[string]string{configs.CMEventTrackingEnabled: "false",
configs.CMEventRingBufferCapacity:
strconv.FormatUint(newRingBufferCapacity, 10),
- configs.CMEventRequestCapacity:
strconv.Itoa(int(newRequestCapacity)),
+ configs.CMEventRequestCapacity:
strconv.FormatUint(newRequestCapacity, 10),
})
err := common.WaitForCondition(func() bool {
return !eventSystem.IsEventTrackingEnabled()
@@ -154,3 +154,51 @@ func TestEventStreaming(t *testing.T) {
assert.Equal(t, 1, len(streams))
assert.Equal(t, "test", streams[0].Name)
}
+
+func TestRequestCapacity(t *testing.T) {
+ config := map[string]string{
+ configs.CMEventRequestCapacity: "123",
+ }
+ configs.SetConfigMap(config)
+
+ capacity := getRequestCapacity()
+ assert.Equal(t, uint64(123), capacity)
+
+ config = map[string]string{
+ configs.CMEventRequestCapacity: "0",
+ }
+ configs.SetConfigMap(config)
+ capacity = getRequestCapacity()
+ assert.Equal(t, uint64(configs.DefaultEventRequestCapacity), capacity)
+
+ config = map[string]string{
+ configs.CMEventRequestCapacity: "xyz",
+ }
+ configs.SetConfigMap(config)
+ capacity = getRequestCapacity()
+ assert.Equal(t, uint64(configs.DefaultEventRequestCapacity), capacity)
+}
+
+func TestRingBufferCapacity(t *testing.T) {
+ config := map[string]string{
+ configs.CMEventRingBufferCapacity: "123",
+ }
+ configs.SetConfigMap(config)
+
+ capacity := getRingBufferCapacity()
+ assert.Equal(t, uint64(123), capacity)
+
+ config = map[string]string{
+ configs.CMEventRingBufferCapacity: "0",
+ }
+ configs.SetConfigMap(config)
+ capacity = getRingBufferCapacity()
+ assert.Equal(t, uint64(configs.DefaultEventRingBufferCapacity),
capacity)
+
+ config = map[string]string{
+ configs.CMEventRingBufferCapacity: "xyz",
+ }
+ configs.SetConfigMap(config)
+ capacity = getRingBufferCapacity()
+ assert.Equal(t, uint64(configs.DefaultEventRingBufferCapacity),
capacity)
+}
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index 034da766..ad0dc58c 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -373,7 +373,7 @@ func TestAddAllocAsk(t *testing.T) {
}
// test add alloc ask event
- noEvents := 0
+ noEvents := uint64(0)
err = common.WaitFor(10*time.Millisecond, time.Second, func() bool {
fmt.Printf("checking event length: %d\n",
eventSystem.Store.CountStoredEvents())
noEvents = eventSystem.Store.CountStoredEvents()
@@ -1994,7 +1994,7 @@ func TestAskEvents(t *testing.T) {
err = app.AddAllocationAsk(ask)
assert.NilError(t, err)
app.RemoveAllocationAsk(ask.allocationKey)
- noEvents := 0
+ noEvents := uint64(0)
err = common.WaitFor(10*time.Millisecond, time.Second, func() bool {
noEvents = eventSystem.Store.CountStoredEvents()
return noEvents == 3
@@ -2066,7 +2066,7 @@ func TestAllocationEvents(t *testing.T) { //nolint:funlen
app.AddAllocation(alloc2)
app.RemoveAllocation(alloc1.GetAllocationID(),
si.TerminationType_STOPPED_BY_RM)
app.RemoveAllocation(alloc2.GetAllocationID(),
si.TerminationType_PLACEHOLDER_REPLACED)
- noEvents := 0
+ noEvents := uint64(0)
err = common.WaitFor(10*time.Millisecond, time.Second, func() bool {
noEvents = eventSystem.Store.CountStoredEvents()
return noEvents == 5
@@ -2189,7 +2189,7 @@ func TestPlaceholderLargerEvent(t *testing.T) {
return nil
})
- noEvents := 0
+ noEvents := uint64(0)
err = common.WaitFor(10*time.Millisecond, time.Second, func() bool {
noEvents = eventSystem.Store.CountStoredEvents()
return noEvents == 4
diff --git a/pkg/scheduler/objects/queue_test.go
b/pkg/scheduler/objects/queue_test.go
index f9111d65..46e8f749 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -2565,7 +2565,7 @@ func TestQueueEvents(t *testing.T) {
app := newApplication(appID0, "default", "root")
queue.AddApplication(app)
queue.RemoveApplication(app)
- noEvents := 0
+ noEvents := uint64(0)
err = common.WaitFor(10*time.Millisecond, time.Second, func() bool {
noEvents = eventSystem.Store.CountStoredEvents()
return noEvents == 5
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index c42d784e..0b296910 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -3782,7 +3782,7 @@ func TestNewQueueEvents(t *testing.T) {
User: "test",
})
assert.NilError(t, err)
- noEvents := 0
+ noEvents := uint64(0)
err = common.WaitFor(10*time.Millisecond, time.Second, func() bool {
noEvents = eventSystem.Store.CountStoredEvents()
return noEvents == 3
diff --git a/pkg/webservice/handlers_test.go b/pkg/webservice/handlers_test.go
index dd264d28..4d8ca9ed 100644
--- a/pkg/webservice/handlers_test.go
+++ b/pkg/webservice/handlers_test.go
@@ -2372,7 +2372,7 @@ func addEvents(t *testing.T) (appEvent, nodeEvent,
queueEvent *si.EventRecord) {
ReferenceID: "app",
}
ev.AddEvent(queueEvent)
- noEvents := 0
+ noEvents := uint64(0)
err := common.WaitFor(10*time.Millisecond, time.Second, func() bool {
noEvents = ev.Store.CountStoredEvents()
return noEvents == 3
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]