This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b807cf1 [YUNIKORN-2513] Fix event system to use event.requestCapacity 
(#833)
7b807cf1 is described below

commit 7b807cf10d02f42e2e6d312391d6e2c6ddf2e607
Author: Peter Bacsko <[email protected]>
AuthorDate: Sun Apr 7 02:48:57 2024 +0200

    [YUNIKORN-2513] Fix event system to use event.requestCapacity (#833)
    
    Closes: #833
    
    Signed-off-by: Peter Bacsko <[email protected]>
---
 pkg/events/event_publisher_test.go        |  6 ++--
 pkg/events/event_store.go                 | 34 +++++++++++-------
 pkg/events/event_store_test.go            | 46 ++++++++++++++++++++----
 pkg/events/event_system.go                | 60 +++++++++++++++++++------------
 pkg/events/event_system_test.go           | 54 ++++++++++++++++++++++++++--
 pkg/scheduler/objects/application_test.go |  8 ++---
 pkg/scheduler/objects/queue_test.go       |  2 +-
 pkg/scheduler/partition_test.go           |  2 +-
 pkg/webservice/handlers_test.go           |  2 +-
 9 files changed, 159 insertions(+), 55 deletions(-)

diff --git a/pkg/events/event_publisher_test.go 
b/pkg/events/event_publisher_test.go
index d83ffae8..79fc95fc 100644
--- a/pkg/events/event_publisher_test.go
+++ b/pkg/events/event_publisher_test.go
@@ -38,7 +38,7 @@ func TestCreateShimPublisher(t *testing.T) {
 
 // StartService() and Stop() functions should not cause panic
 func TestServiceStartStopInternal(t *testing.T) {
-       store := newEventStore()
+       store := newEventStore(1000)
        publisher := CreateShimPublisher(store)
        publisher.StartService()
        defer publisher.Stop()
@@ -46,7 +46,7 @@ func TestServiceStartStopInternal(t *testing.T) {
 }
 
 func TestNoFillWithoutEventPluginRegistered(t *testing.T) {
-       store := newEventStore()
+       store := newEventStore(1000)
        publisher := CreateShimPublisher(store)
        publisher.pushEventInterval = time.Millisecond
        publisher.StartService()
@@ -76,7 +76,7 @@ func TestPublisherSendsEvent(t *testing.T) {
                t.Fatal("could not register event plugin for test")
        }
 
-       store := newEventStore()
+       store := newEventStore(1000)
        publisher := CreateShimPublisher(store)
        publisher.pushEventInterval = time.Millisecond
        publisher.StartService()
diff --git a/pkg/events/event_store.go b/pkg/events/event_store.go
index ff27079b..b9cb38f0 100644
--- a/pkg/events/event_store.go
+++ b/pkg/events/event_store.go
@@ -19,13 +19,14 @@
 package events
 
 import (
+       "go.uber.org/zap"
+
        "github.com/apache/yunikorn-core/pkg/locking"
+       "github.com/apache/yunikorn-core/pkg/log"
        "github.com/apache/yunikorn-core/pkg/metrics"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
 
-const defaultEventStoreSize = 1000
-
 // The EventStore operates under the following assumptions:
 //   - there is a cap for the number of events stored
 //   - the CollectEvents() function clears the currently stored events in the 
EventStore
@@ -33,18 +34,17 @@ const defaultEventStoreSize = 1000
 // Assuming the rate of events generated by the scheduler component in a given 
time period
 // is high, calling CollectEvents() periodically should be fine.
 type EventStore struct {
-       events []*si.EventRecord
-       idx    int // points where to store the next event
+       events   []*si.EventRecord
+       idx      uint64 // points where to store the next event
+       size     uint64
+       lastSize uint64
        locking.RWMutex
 }
 
-func newEventStore() *EventStore {
-       return newEventStoreWithSize(defaultEventStoreSize)
-}
-
-func newEventStoreWithSize(size int) *EventStore {
+func newEventStore(size uint64) *EventStore {
        return &EventStore{
                events: make([]*si.EventRecord, size),
+               size:   size,
        }
 }
 
@@ -52,7 +52,7 @@ func (es *EventStore) Store(event *si.EventRecord) {
        es.Lock()
        defer es.Unlock()
 
-       if es.idx == len(es.events) {
+       if es.idx == uint64(len(es.events)) {
                metrics.GetEventMetrics().IncEventsNotStored()
                return
        }
@@ -67,16 +67,26 @@ func (es *EventStore) CollectEvents() []*si.EventRecord {
        defer es.Unlock()
 
        messages := es.events[:es.idx]
-       es.events = make([]*si.EventRecord, defaultEventStoreSize)
+       if es.size != es.lastSize {
+               log.Log(log.Events).Info("Resizing event store", 
zap.Uint64("last", es.lastSize), zap.Uint64("new", es.size))
+               es.events = make([]*si.EventRecord, es.size)
+       }
        es.idx = 0
+       es.lastSize = es.size
 
        metrics.GetEventMetrics().AddEventsCollected(len(messages))
        return messages
 }
 
-func (es *EventStore) CountStoredEvents() int {
+func (es *EventStore) CountStoredEvents() uint64 {
        es.RLock()
        defer es.RUnlock()
 
        return es.idx
 }
+
+func (es *EventStore) SetStoreSize(size uint64) {
+       es.Lock()
+       defer es.Unlock()
+       es.size = size
+}
diff --git a/pkg/events/event_store_test.go b/pkg/events/event_store_test.go
index 7bb2c17c..c9abbeca 100644
--- a/pkg/events/event_store_test.go
+++ b/pkg/events/event_store_test.go
@@ -30,8 +30,8 @@ import (
 
 // the fields of an event should match after stored and retrieved
 func TestStoreAndRetrieve(t *testing.T) {
-       store := newEventStore()
-       event := &si.EventRecord{
+       store := newEventStore(1000)
+       event1 := &si.EventRecord{
                Type:              si.EventRecord_REQUEST,
                EventChangeDetail: si.EventRecord_DETAILS_NONE,
                EventChangeType:   si.EventRecord_NONE,
@@ -39,12 +39,21 @@ func TestStoreAndRetrieve(t *testing.T) {
                ReferenceID:       "app1",
                Message:           "message",
        }
-       store.Store(event)
+       event2 := &si.EventRecord{
+               Type:              si.EventRecord_REQUEST,
+               EventChangeDetail: si.EventRecord_DETAILS_NONE,
+               EventChangeType:   si.EventRecord_NONE,
+               ObjectID:          "alloc2",
+               ReferenceID:       "app1",
+               Message:           "message",
+       }
+       store.Store(event1)
+       store.Store(event2)
 
        records := store.CollectEvents()
-       assert.Equal(t, len(records), 1)
-       record := records[0]
-       assert.DeepEqual(t, record, event, 
cmpopts.IgnoreUnexported(si.EventRecord{}))
+       assert.Equal(t, len(records), 2)
+       assert.DeepEqual(t, records[0], event1, 
cmpopts.IgnoreUnexported(si.EventRecord{}))
+       assert.DeepEqual(t, records[1], event2, 
cmpopts.IgnoreUnexported(si.EventRecord{}))
 
        // calling CollectEvents erases the eventChannel map
        records = store.CollectEvents()
@@ -54,7 +63,7 @@ func TestStoreAndRetrieve(t *testing.T) {
 // if we push more events to the EventStore than its
 // allowed maximum, those that couldn't fit will be omitted
 func TestStoreWithLimitedSize(t *testing.T) {
-       store := newEventStoreWithSize(3)
+       store := newEventStore(3)
 
        for i := 0; i < 5; i++ {
                event := &si.EventRecord{
@@ -68,3 +77,26 @@ func TestStoreWithLimitedSize(t *testing.T) {
        records := store.CollectEvents()
        assert.Equal(t, len(records), 3)
 }
+
+func TestSetStoreSize(t *testing.T) {
+       store := newEventStore(5)
+       // store 5 events
+       for i := 0; i < 5; i++ {
+               store.Store(&si.EventRecord{
+                       Type:              si.EventRecord_REQUEST,
+                       EventChangeDetail: si.EventRecord_DETAILS_NONE,
+                       EventChangeType:   si.EventRecord_NONE,
+                       ObjectID:          "alloc" + strconv.Itoa(i),
+                       ReferenceID:       "app1",
+               })
+       }
+
+       // set smaller size
+       store.SetStoreSize(3)
+       assert.Equal(t, uint64(3), store.size)
+
+       // collect events & make sure events are not lost
+       events := store.CollectEvents()
+       assert.Equal(t, 5, len(events))
+       assert.Equal(t, 3, len(store.events))
+}
diff --git a/pkg/events/event_system.go b/pkg/events/event_system.go
index 86fbd8d2..2702d57c 100644
--- a/pkg/events/event_system.go
+++ b/pkg/events/event_system.go
@@ -23,6 +23,8 @@ import (
        "sync"
        "time"
 
+       "go.uber.org/zap"
+
        "github.com/apache/yunikorn-core/pkg/common"
        "github.com/apache/yunikorn-core/pkg/common/configs"
        "github.com/apache/yunikorn-core/pkg/locking"
@@ -92,7 +94,7 @@ type EventSystemImpl struct {
        stopped bool
 
        trackingEnabled    bool
-       requestCapacity    int
+       requestCapacity    uint64
        ringBufferCapacity uint64
 
        locking.RWMutex
@@ -129,7 +131,7 @@ func (ec *EventSystemImpl) IsEventTrackingEnabled() bool {
 }
 
 // GetRequestCapacity returns the capacity of an intermediate storage which is 
used by the shim publisher.
-func (ec *EventSystemImpl) GetRequestCapacity() int {
+func (ec *EventSystemImpl) GetRequestCapacity() uint64 {
        ec.RLock()
        defer ec.RUnlock()
        return ec.requestCapacity
@@ -145,16 +147,16 @@ func (ec *EventSystemImpl) GetRingBufferCapacity() uint64 
{
 // Init Initializes the event system.
 // Only exported for testing.
 func Init() {
-       store := newEventStore()
-       buffer := newEventRingBuffer(defaultRingBufferSize)
+       store := newEventStore(getRequestCapacity())
+       buffer := newEventRingBuffer(getRingBufferCapacity())
        ev = &EventSystemImpl{
                Store:         store,
                channel:       make(chan *si.EventRecord, 
defaultEventChannelSize),
                stop:          make(chan bool),
                stopped:       false,
-               publisher:     CreateShimPublisher(store),
                eventBuffer:   buffer,
                eventSystemId: fmt.Sprintf("event-system-%d", 
time.Now().Unix()),
+               publisher:     CreateShimPublisher(store),
                streaming:     NewEventStreaming(buffer),
        }
 }
@@ -174,9 +176,9 @@ func (ec *EventSystemImpl) 
StartServiceWithPublisher(withPublisher bool) {
                go ec.reloadConfig()
        })
 
-       ec.trackingEnabled = ec.readIsTrackingEnabled()
-       ec.ringBufferCapacity = ec.readRingBufferCapacity()
-       ec.requestCapacity = ec.readRequestCapacity()
+       ec.trackingEnabled = isTrackingEnabled()
+       ec.ringBufferCapacity = getRingBufferCapacity()
+       ec.requestCapacity = getRequestCapacity()
 
        go func() {
                log.Log(log.Events).Info("Starting event system handler")
@@ -234,22 +236,36 @@ func (ec *EventSystemImpl) AddEvent(event 
*si.EventRecord) {
        }
 }
 
-func (ec *EventSystemImpl) readIsTrackingEnabled() bool {
+func isTrackingEnabled() bool {
        return common.GetConfigurationBool(configs.GetConfigMap(), 
configs.CMEventTrackingEnabled, configs.DefaultEventTrackingEnabled)
 }
 
-func (ec *EventSystemImpl) readRequestCapacity() int {
-       return common.GetConfigurationInt(configs.GetConfigMap(), 
configs.CMEventRequestCapacity, configs.DefaultEventRequestCapacity)
+func getRequestCapacity() uint64 {
+       capacity := common.GetConfigurationUint(configs.GetConfigMap(), 
configs.CMEventRequestCapacity, configs.DefaultEventRequestCapacity)
+       if capacity == 0 {
+               log.Log(log.Events).Warn("Request capacity is set to 0, using 
default",
+                       zap.String("property", configs.CMEventRequestCapacity),
+                       zap.Uint64("default", 
configs.DefaultEventRequestCapacity))
+               return configs.DefaultEventRequestCapacity
+       }
+       return capacity
 }
 
-func (ec *EventSystemImpl) readRingBufferCapacity() uint64 {
-       return common.GetConfigurationUint(configs.GetConfigMap(), 
configs.CMEventRingBufferCapacity, configs.DefaultEventRingBufferCapacity)
+func getRingBufferCapacity() uint64 {
+       capacity := common.GetConfigurationUint(configs.GetConfigMap(), 
configs.CMEventRingBufferCapacity, configs.DefaultEventRingBufferCapacity)
+       if capacity == 0 {
+               log.Log(log.Events).Warn("Ring buffer capacity is set to 0, 
using default",
+                       zap.String("property", 
configs.CMEventRingBufferCapacity),
+                       zap.Uint64("default", 
configs.DefaultEventRingBufferCapacity))
+               return configs.DefaultEventRingBufferCapacity
+       }
+       return capacity
 }
 
 func (ec *EventSystemImpl) isRestartNeeded() bool {
        ec.RLock()
        defer ec.RUnlock()
-       return ec.readIsTrackingEnabled() != ec.trackingEnabled
+       return isTrackingEnabled() != ec.trackingEnabled
 }
 
 // Restart restarts the event system, used during config update.
@@ -273,18 +289,16 @@ func (ec *EventSystemImpl) CloseAllStreams() {
 }
 
 func (ec *EventSystemImpl) reloadConfig() {
-       ec.updateRequestCapacity()
+       ec.Lock()
+       ec.requestCapacity = getRequestCapacity()
+       ec.ringBufferCapacity = getRingBufferCapacity()
+       ec.Unlock()
 
-       // resize the ring buffer with new capacity
-       ec.eventBuffer.Resize(ec.readRingBufferCapacity())
+       // resize the ring buffer & event store with new capacity
+       ec.Store.SetStoreSize(ec.requestCapacity)
+       ec.eventBuffer.Resize(ec.ringBufferCapacity)
 
        if ec.isRestartNeeded() {
                ec.Restart()
        }
 }
-
-func (ec *EventSystemImpl) updateRequestCapacity() {
-       ec.Lock()
-       defer ec.Unlock()
-       ec.requestCapacity = ec.readRequestCapacity()
-}
diff --git a/pkg/events/event_system_test.go b/pkg/events/event_system_test.go
index f00961e2..5d2f9d05 100644
--- a/pkg/events/event_system_test.go
+++ b/pkg/events/event_system_test.go
@@ -120,17 +120,17 @@ func TestConfigUpdate(t *testing.T) {
 
        assert.Assert(t, eventSystem.IsEventTrackingEnabled())
        assert.Equal(t, eventSystem.GetRingBufferCapacity(), 
uint64(configs.DefaultEventRingBufferCapacity))
-       assert.Equal(t, eventSystem.GetRequestCapacity(), 
configs.DefaultEventRequestCapacity)
+       assert.Equal(t, eventSystem.GetRequestCapacity(), 
uint64(configs.DefaultEventRequestCapacity))
        assert.Equal(t, eventSystem.eventBuffer.capacity, 
uint64(configs.DefaultEventRingBufferCapacity))
 
        // update config and wait for refresh
        var newRingBufferCapacity uint64 = 123
-       newRequestCapacity := 555
+       newRequestCapacity := uint64(555)
 
        configs.SetConfigMap(
                map[string]string{configs.CMEventTrackingEnabled: "false",
                        configs.CMEventRingBufferCapacity: 
strconv.FormatUint(newRingBufferCapacity, 10),
-                       configs.CMEventRequestCapacity:    
strconv.Itoa(int(newRequestCapacity)),
+                       configs.CMEventRequestCapacity:    
strconv.FormatUint(newRequestCapacity, 10),
                })
        err := common.WaitForCondition(func() bool {
                return !eventSystem.IsEventTrackingEnabled()
@@ -154,3 +154,51 @@ func TestEventStreaming(t *testing.T) {
        assert.Equal(t, 1, len(streams))
        assert.Equal(t, "test", streams[0].Name)
 }
+
+func TestRequestCapacity(t *testing.T) {
+       config := map[string]string{
+               configs.CMEventRequestCapacity: "123",
+       }
+       configs.SetConfigMap(config)
+
+       capacity := getRequestCapacity()
+       assert.Equal(t, uint64(123), capacity)
+
+       config = map[string]string{
+               configs.CMEventRequestCapacity: "0",
+       }
+       configs.SetConfigMap(config)
+       capacity = getRequestCapacity()
+       assert.Equal(t, uint64(configs.DefaultEventRequestCapacity), capacity)
+
+       config = map[string]string{
+               configs.CMEventRequestCapacity: "xyz",
+       }
+       configs.SetConfigMap(config)
+       capacity = getRequestCapacity()
+       assert.Equal(t, uint64(configs.DefaultEventRequestCapacity), capacity)
+}
+
+func TestRingBufferCapacity(t *testing.T) {
+       config := map[string]string{
+               configs.CMEventRingBufferCapacity: "123",
+       }
+       configs.SetConfigMap(config)
+
+       capacity := getRingBufferCapacity()
+       assert.Equal(t, uint64(123), capacity)
+
+       config = map[string]string{
+               configs.CMEventRingBufferCapacity: "0",
+       }
+       configs.SetConfigMap(config)
+       capacity = getRingBufferCapacity()
+       assert.Equal(t, uint64(configs.DefaultEventRingBufferCapacity), 
capacity)
+
+       config = map[string]string{
+               configs.CMEventRingBufferCapacity: "xyz",
+       }
+       configs.SetConfigMap(config)
+       capacity = getRingBufferCapacity()
+       assert.Equal(t, uint64(configs.DefaultEventRingBufferCapacity), 
capacity)
+}
diff --git a/pkg/scheduler/objects/application_test.go 
b/pkg/scheduler/objects/application_test.go
index 034da766..ad0dc58c 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -373,7 +373,7 @@ func TestAddAllocAsk(t *testing.T) {
        }
 
        // test add alloc ask event
-       noEvents := 0
+       noEvents := uint64(0)
        err = common.WaitFor(10*time.Millisecond, time.Second, func() bool {
                fmt.Printf("checking event length: %d\n", 
eventSystem.Store.CountStoredEvents())
                noEvents = eventSystem.Store.CountStoredEvents()
@@ -1994,7 +1994,7 @@ func TestAskEvents(t *testing.T) {
        err = app.AddAllocationAsk(ask)
        assert.NilError(t, err)
        app.RemoveAllocationAsk(ask.allocationKey)
-       noEvents := 0
+       noEvents := uint64(0)
        err = common.WaitFor(10*time.Millisecond, time.Second, func() bool {
                noEvents = eventSystem.Store.CountStoredEvents()
                return noEvents == 3
@@ -2066,7 +2066,7 @@ func TestAllocationEvents(t *testing.T) { //nolint:funlen
        app.AddAllocation(alloc2)
        app.RemoveAllocation(alloc1.GetAllocationID(), 
si.TerminationType_STOPPED_BY_RM)
        app.RemoveAllocation(alloc2.GetAllocationID(), 
si.TerminationType_PLACEHOLDER_REPLACED)
-       noEvents := 0
+       noEvents := uint64(0)
        err = common.WaitFor(10*time.Millisecond, time.Second, func() bool {
                noEvents = eventSystem.Store.CountStoredEvents()
                return noEvents == 5
@@ -2189,7 +2189,7 @@ func TestPlaceholderLargerEvent(t *testing.T) {
                return nil
        })
 
-       noEvents := 0
+       noEvents := uint64(0)
        err = common.WaitFor(10*time.Millisecond, time.Second, func() bool {
                noEvents = eventSystem.Store.CountStoredEvents()
                return noEvents == 4
diff --git a/pkg/scheduler/objects/queue_test.go 
b/pkg/scheduler/objects/queue_test.go
index f9111d65..46e8f749 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -2565,7 +2565,7 @@ func TestQueueEvents(t *testing.T) {
        app := newApplication(appID0, "default", "root")
        queue.AddApplication(app)
        queue.RemoveApplication(app)
-       noEvents := 0
+       noEvents := uint64(0)
        err = common.WaitFor(10*time.Millisecond, time.Second, func() bool {
                noEvents = eventSystem.Store.CountStoredEvents()
                return noEvents == 5
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index c42d784e..0b296910 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -3782,7 +3782,7 @@ func TestNewQueueEvents(t *testing.T) {
                User: "test",
        })
        assert.NilError(t, err)
-       noEvents := 0
+       noEvents := uint64(0)
        err = common.WaitFor(10*time.Millisecond, time.Second, func() bool {
                noEvents = eventSystem.Store.CountStoredEvents()
                return noEvents == 3
diff --git a/pkg/webservice/handlers_test.go b/pkg/webservice/handlers_test.go
index dd264d28..4d8ca9ed 100644
--- a/pkg/webservice/handlers_test.go
+++ b/pkg/webservice/handlers_test.go
@@ -2372,7 +2372,7 @@ func addEvents(t *testing.T) (appEvent, nodeEvent, 
queueEvent *si.EventRecord) {
                ReferenceID:       "app",
        }
        ev.AddEvent(queueEvent)
-       noEvents := 0
+       noEvents := uint64(0)
        err := common.WaitFor(10*time.Millisecond, time.Second, func() bool {
                noEvents = ev.Store.CountStoredEvents()
                return noEvents == 3


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to