This is an automated email from the ASF dual-hosted git repository. pbacsko pushed a commit to branch yunikorn-1.3.1 in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
commit 65d329fc4b73d35dcd66c019968e38e8619fb736 Author: Peter Bacsko <[email protected]> AuthorDate: Fri Sep 8 12:56:20 2023 +1000 [YUNIKORN-1942] Null Batch API Response after buffer size change (#635) When the ring buffer size is changed the first ID in the buffer needs to be updated. Normally the first ID is 0 but after resize this could be any random number. That breaks id2pos and causes a nil return in the REST call. Closes: #635 Signed-off-by: Wilfred Spiegelenburg <[email protected]> (cherry picked from commit ea422103e51844b598988d42e39e97ca2842df82) --- pkg/events/event_ringbuffer.go | 79 +++++++++++++------------------------ pkg/events/event_ringbuffer_test.go | 14 +++++-- pkg/webservice/routes.go | 2 +- 3 files changed, 39 insertions(+), 56 deletions(-) diff --git a/pkg/events/event_ringbuffer.go b/pkg/events/event_ringbuffer.go index 71ab8cfa..479ea70f 100644 --- a/pkg/events/event_ringbuffer.go +++ b/pkg/events/event_ringbuffer.go @@ -35,19 +35,21 @@ type eventRange struct { // eventRingBuffer A specialized circular buffer to store event objects. // -// Unlike to regular circular buffers, existing entries are never directly removed and new entries can be added if the buffer is full. +// Unlike regular circular buffers, existing entries are never directly removed +// and new entries can be added if the buffer is full. // In this case, the oldest entry is overwritten and can be collected by the GC. -// Each event has an ID, however, this mapping is not stored directly. If needed, we calculate the id -// of the event based on slice positions. +// Each event has an ID; however, this mapping is not stored directly. +// If needed, we calculate the id of the event based on slice positions. // -// Retrieving the records can be achieved with GetEventsFromID and GetRecentEntries. +// Retrieving the records can be achieved with GetEventsFromID. type eventRingBuffer struct { - events []*si.EventRecord - capacity uint64 // capacity of the buffer - head uint64 // position of the next element (no tail since we don't remove elements) - full bool // indicates whether the buffer if full - once it is, it stays full unless buffer is resized - id uint64 // unique id of an event record - lowestId uint64 // lowest id of an event record available in the buffer at any given time + events []*si.EventRecord + capacity uint64 // capacity of the buffer + head uint64 // position of the next element (no tail since we don't remove elements) + full bool // indicates whether the buffer if full - once it is, it stays full unless the buffer is resized + id uint64 // unique id of an event record + lowestId uint64 // lowest id of an event record available in the buffer at any given time + resizeOffset uint64 // used to aid the calculation of id->pos after resize (see id2pos) sync.RWMutex } @@ -62,16 +64,13 @@ func (e *eventRingBuffer) Add(event *si.EventRecord) { if !e.full { e.full = e.head == e.capacity-1 } else { - // lowest event id updates when new event added to a full buffer - log.Log(log.Events).Debug("event buffer full, oldest event will be lost", - zap.String("id", strconv.FormatUint(e.lowestId, 10))) e.lowestId++ } e.head = (e.head + 1) % e.capacity e.id++ } -// GetEventsFromID returns "count" number of event records from id if possible. The id can be determined from +// GetEventsFromID returns "count" number of event records from "id" if possible. The id can be determined from // the first call of the method - if it returns nothing because the id is not in the buffer, the lowest valid // identifier is returned which can be used to get the first batch. // If the caller does not want to pose limit on the number of events returned, "count" must be set to a high @@ -162,36 +161,18 @@ func (e *eventRingBuffer) getEntriesFromRanges(r1, r2 *eventRange) []*si.EventRe } // id2pos translates the unique event ID to an index in the event slice. -// If the event is present the position will be returned and the found flag will be true. -// In the case that the event ID is not present the position returned is 0 and the flag false. +// If the event is present, the position will be returned and the found flag will be true. +// If the event ID is not present, the position returned is 0 and the flag is false. func (e *eventRingBuffer) id2pos(id uint64) (uint64, bool) { - pos := id % e.capacity - var calculatedID uint64 // calculated ID based on index values - if pos > e.head { - diff := pos - e.head - calculatedID = e.getLowestID() + diff - } else { - pId := e.id - 1 - idAtZero := pId - (pId % e.capacity) // unique id at slice position 0 - calculatedID = idAtZero + pos - } - - if !e.full { - if e.head == 0 { - // empty - return 0, false - } - if pos >= e.head { - // "pos" is not in the [0..head-1] range - return 0, false - } + // id out of range? + if id < e.lowestId || id >= e.id { + return 0, false } - if calculatedID != id { - return calculatedID, false - } - - return pos, true + // resizeOffset tells how many elements were "shifted out" after resizing the buffer + // eg a buffer with 10 elements is full, then gets resized to 6 + // the first element at index 0 is no longer 0 or the multiples of 10, but 4, 16, 22, etc. + return (id - e.resizeOffset) % e.capacity, true } // getLowestID returns the current lowest available id in the buffer. @@ -206,7 +187,7 @@ func newEventRingBuffer(capacity uint64) *eventRingBuffer { } } -// called from Resize(), This functuin updates the lowest event id available in the buffer +// called from Resize(), this function updates the lowest event id available in the buffer func (e *eventRingBuffer) updateLowestId(beginSize, endSize uint64) { // if buffer size is increasing, lowestId stays the same if beginSize < endSize { @@ -214,7 +195,7 @@ func (e *eventRingBuffer) updateLowestId(beginSize, endSize uint64) { } // bufferSize is shrinking - // if number of events is < newSize no change + // if the number of events is < newSize, then no change if (e.id - e.getLowestID()) <= endSize { return } @@ -223,22 +204,18 @@ func (e *eventRingBuffer) updateLowestId(beginSize, endSize uint64) { e.lowestId = e.id - endSize } -// resize the existing ring buffer +// Resize resizes the existing ring buffer // this method will be called upon configuration reload func (e *eventRingBuffer) Resize(newSize uint64) { e.Lock() defer e.Unlock() if newSize == e.capacity { - return // Nothing to do if the size is the same + return } initialSize := e.capacity - - // Create a new buffer with the desired size newEvents := make([]*si.EventRecord, newSize) - - // Determine the number of events to copy var numEventsToCopy uint64 if e.id-e.getLowestID() > newSize { numEventsToCopy = newSize @@ -270,11 +247,9 @@ func (e *eventRingBuffer) Resize(newSize uint64) { copy(newEvents[e.capacity-startIndex:], e.events[:endIndex+1]) } - // Update the buffer's state e.capacity = newSize e.events = newEvents e.head = numEventsToCopy % newSize - - // Update e.full based on whether the buffer is full after resizing + e.resizeOffset = e.lowestId e.full = numEventsToCopy == e.capacity } diff --git a/pkg/events/event_ringbuffer_test.go b/pkg/events/event_ringbuffer_test.go index b155f850..6b390e69 100644 --- a/pkg/events/event_ringbuffer_test.go +++ b/pkg/events/event_ringbuffer_test.go @@ -192,6 +192,7 @@ func TestResize(t *testing.T) { assert.Equal(t, uint64(6), ringBuffer.capacity) assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID()) assert.Equal(t, 6, len(ringBuffer.events)) + assert.Equal(t, uint64(0), ringBuffer.resizeOffset) // Test case 2: Resize to a smaller size lastEventIdBeforeResize = ringBuffer.GetLastEventID() @@ -199,6 +200,7 @@ func TestResize(t *testing.T) { assert.Equal(t, uint64(2), ringBuffer.capacity) assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID()) assert.Equal(t, 2, len(ringBuffer.events)) + assert.Equal(t, uint64(2), ringBuffer.resizeOffset) // Test case 3: Resize to a larger size lastEventIdBeforeResize = ringBuffer.GetLastEventID() @@ -206,6 +208,7 @@ func TestResize(t *testing.T) { assert.Equal(t, uint64(20), ringBuffer.capacity) assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID()) assert.Equal(t, 20, len(ringBuffer.events)) + assert.Equal(t, uint64(2), ringBuffer.resizeOffset) // Test case 4: Resize when head is at the last element ringBuffer = newEventRingBuffer(5) @@ -215,6 +218,7 @@ func TestResize(t *testing.T) { assert.Equal(t, uint64(2), ringBuffer.capacity) assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID()) assert.Equal(t, 2, len(ringBuffer.events)) + assert.Equal(t, uint64(2), ringBuffer.resizeOffset) // Test case 5: Resize to events length when head is at the last element ringBuffer = newEventRingBuffer(5) @@ -225,7 +229,8 @@ func TestResize(t *testing.T) { assert.Equal(t, uint64(4), ringBuffer.capacity) assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID()) assert.Equal(t, 4, len(ringBuffer.events)) - assert.Equal(t, true, ringBuffer.full) + assert.Equal(t, uint64(0), ringBuffer.resizeOffset) + assert.Assert(t, ringBuffer.full) // Test case 6: Resize when the buffer is full ringBuffer = newEventRingBuffer(10) @@ -237,6 +242,7 @@ func TestResize(t *testing.T) { assert.Equal(t, 6, len(ringBuffer.events)) assert.Equal(t, uint64(0), ringBuffer.head) assert.Equal(t, true, ringBuffer.full) + assert.Equal(t, uint64(4), ringBuffer.resizeOffset) // Test case 7: Resize when the buffer is overflown (head is wrapped and position > 0) ringBuffer = newEventRingBuffer(10) @@ -248,7 +254,8 @@ func TestResize(t *testing.T) { assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID()) assert.Equal(t, 8, len(ringBuffer.events)) assert.Equal(t, uint64(0), ringBuffer.head) - assert.Equal(t, true, ringBuffer.full) + assert.Equal(t, uint64(7), ringBuffer.resizeOffset) + assert.Assert(t, ringBuffer.full) // Test case 8: Test event full : Resize to lower size, followed by resize to a large size ringBuffer = newEventRingBuffer(10) @@ -258,6 +265,7 @@ func TestResize(t *testing.T) { assert.Equal(t, true, ringBuffer.full) ringBuffer.Resize(6) assert.Equal(t, false, ringBuffer.full) + assert.Equal(t, uint64(7), ringBuffer.resizeOffset) // Test case 9: Test resize to same size lastEventIdBeforeResize = ringBuffer.GetLastEventID() @@ -266,7 +274,7 @@ func TestResize(t *testing.T) { assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID()) assert.Equal(t, 6, len(ringBuffer.events)) assert.Equal(t, false, ringBuffer.full) - + assert.Equal(t, uint64(7), ringBuffer.resizeOffset) } func populate(buffer *eventRingBuffer, count int) { diff --git a/pkg/webservice/routes.go b/pkg/webservice/routes.go index 632a6dbc..bbe1629c 100644 --- a/pkg/webservice/routes.go +++ b/pkg/webservice/routes.go @@ -185,7 +185,7 @@ var webRoutes = routes{ route{ "Scheduler", "GET", - "/ws/v1/events/batch/", + "/ws/v1/events/batch", getEvents, }, // endpoint to retrieve CPU, Memory profiling data, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
