This is an automated email from the ASF dual-hosted git repository.

wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new ea422103 [YUNIKORN-1942] Null Batch API Response after buffer size 
change (#635)
ea422103 is described below

commit ea422103e51844b598988d42e39e97ca2842df82
Author: Peter Bacsko <[email protected]>
AuthorDate: Fri Sep 8 12:56:20 2023 +1000

    [YUNIKORN-1942] Null Batch API Response after buffer size change (#635)
    
    When the ring buffer size is changed the first ID in the buffer needs to
    be updated. Normally the first ID is 0 but after resize this could be
    any random number. That breaks id2pos and causes a nil return in the
    REST call.
    
    Closes: #635
    
    Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
 pkg/events/event_ringbuffer.go      | 79 +++++++++++++------------------------
 pkg/events/event_ringbuffer_test.go | 14 +++++--
 pkg/webservice/routes.go            |  2 +-
 3 files changed, 39 insertions(+), 56 deletions(-)

diff --git a/pkg/events/event_ringbuffer.go b/pkg/events/event_ringbuffer.go
index 71ab8cfa..479ea70f 100644
--- a/pkg/events/event_ringbuffer.go
+++ b/pkg/events/event_ringbuffer.go
@@ -35,19 +35,21 @@ type eventRange struct {
 
 // eventRingBuffer A specialized circular buffer to store event objects.
 //
-// Unlike to regular circular buffers, existing entries are never directly 
removed and new entries can be added if the buffer is full.
+// Unlike regular circular buffers, existing entries are never directly removed
+// and new entries can be added if the buffer is full.
 // In this case, the oldest entry is overwritten and can be collected by the 
GC.
-// Each event has an ID, however, this mapping is not stored directly. If 
needed, we calculate the id
-// of the event based on slice positions.
+// Each event has an ID; however, this mapping is not stored directly.
+// If needed, we calculate the id of the event based on slice positions.
 //
-// Retrieving the records can be achieved with GetEventsFromID and 
GetRecentEntries.
+// Retrieving the records can be achieved with GetEventsFromID.
 type eventRingBuffer struct {
-       events   []*si.EventRecord
-       capacity uint64 // capacity of the buffer
-       head     uint64 // position of the next element (no tail since we don't 
remove elements)
-       full     bool   // indicates whether the buffer if full - once it is, 
it stays full unless buffer is resized
-       id       uint64 // unique id of an event record
-       lowestId uint64 // lowest id of an event record available in the buffer 
at any given time
+       events       []*si.EventRecord
+       capacity     uint64 // capacity of the buffer
+       head         uint64 // position of the next element (no tail since we 
don't remove elements)
+       full         bool   // indicates whether the buffer if full - once it 
is, it stays full unless the buffer is resized
+       id           uint64 // unique id of an event record
+       lowestId     uint64 // lowest id of an event record available in the 
buffer at any given time
+       resizeOffset uint64 // used to aid the calculation of id->pos after 
resize (see id2pos)
 
        sync.RWMutex
 }
@@ -62,16 +64,13 @@ func (e *eventRingBuffer) Add(event *si.EventRecord) {
        if !e.full {
                e.full = e.head == e.capacity-1
        } else {
-               // lowest event id updates when new event added to a full buffer
-               log.Log(log.Events).Debug("event buffer full, oldest event will 
be lost",
-                       zap.String("id", strconv.FormatUint(e.lowestId, 10)))
                e.lowestId++
        }
        e.head = (e.head + 1) % e.capacity
        e.id++
 }
 
-// GetEventsFromID returns "count" number of event records from id if 
possible. The id can be determined from
+// GetEventsFromID returns "count" number of event records from "id" if 
possible. The id can be determined from
 // the first call of the method - if it returns nothing because the id is not 
in the buffer, the lowest valid
 // identifier is returned which can be used to get the first batch.
 // If the caller does not want to pose limit on the number of events returned, 
"count" must be set to a high
@@ -162,36 +161,18 @@ func (e *eventRingBuffer) getEntriesFromRanges(r1, r2 
*eventRange) []*si.EventRe
 }
 
 // id2pos translates the unique event ID to an index in the event slice.
-// If the event is present the position will be returned and the found flag 
will be true.
-// In the case that the event ID is not present the position returned is 0 and 
the flag false.
+// If the event is present, the position will be returned and the found flag 
will be true.
+// If the event ID is not present, the position returned is 0 and the flag is 
false.
 func (e *eventRingBuffer) id2pos(id uint64) (uint64, bool) {
-       pos := id % e.capacity
-       var calculatedID uint64 // calculated ID based on index values
-       if pos > e.head {
-               diff := pos - e.head
-               calculatedID = e.getLowestID() + diff
-       } else {
-               pId := e.id - 1
-               idAtZero := pId - (pId % e.capacity) // unique id at slice 
position 0
-               calculatedID = idAtZero + pos
-       }
-
-       if !e.full {
-               if e.head == 0 {
-                       // empty
-                       return 0, false
-               }
-               if pos >= e.head {
-                       // "pos" is not in the [0..head-1] range
-                       return 0, false
-               }
+       // id out of range?
+       if id < e.lowestId || id >= e.id {
+               return 0, false
        }
 
-       if calculatedID != id {
-               return calculatedID, false
-       }
-
-       return pos, true
+       // resizeOffset tells how many elements were "shifted out" after 
resizing the buffer
+       // eg a buffer with 10 elements is full, then gets resized to 6
+       // the first element at index 0 is no longer 0 or the multiples of 10, 
but 4, 16, 22, etc.
+       return (id - e.resizeOffset) % e.capacity, true
 }
 
 // getLowestID returns the current lowest available id in the buffer.
@@ -206,7 +187,7 @@ func newEventRingBuffer(capacity uint64) *eventRingBuffer {
        }
 }
 
-// called from Resize(), This functuin updates the lowest event id available 
in the buffer
+// called from Resize(), this function updates the lowest event id available 
in the buffer
 func (e *eventRingBuffer) updateLowestId(beginSize, endSize uint64) {
        // if buffer size is increasing, lowestId stays the same
        if beginSize < endSize {
@@ -214,7 +195,7 @@ func (e *eventRingBuffer) updateLowestId(beginSize, endSize 
uint64) {
        }
 
        // bufferSize is shrinking
-       // if number of events is < newSize no change
+       // if the number of events is < newSize, then no change
        if (e.id - e.getLowestID()) <= endSize {
                return
        }
@@ -223,22 +204,18 @@ func (e *eventRingBuffer) updateLowestId(beginSize, 
endSize uint64) {
        e.lowestId = e.id - endSize
 }
 
-// resize the existing ring buffer
+// Resize resizes the existing ring buffer
 // this method will be called upon configuration reload
 func (e *eventRingBuffer) Resize(newSize uint64) {
        e.Lock()
        defer e.Unlock()
 
        if newSize == e.capacity {
-               return // Nothing to do if the size is the same
+               return
        }
 
        initialSize := e.capacity
-
-       // Create a new buffer with the desired size
        newEvents := make([]*si.EventRecord, newSize)
-
-       // Determine the number of events to copy
        var numEventsToCopy uint64
        if e.id-e.getLowestID() > newSize {
                numEventsToCopy = newSize
@@ -270,11 +247,9 @@ func (e *eventRingBuffer) Resize(newSize uint64) {
                copy(newEvents[e.capacity-startIndex:], e.events[:endIndex+1])
        }
 
-       // Update the buffer's state
        e.capacity = newSize
        e.events = newEvents
        e.head = numEventsToCopy % newSize
-
-       // Update e.full based on whether the buffer is full after resizing
+       e.resizeOffset = e.lowestId
        e.full = numEventsToCopy == e.capacity
 }
diff --git a/pkg/events/event_ringbuffer_test.go 
b/pkg/events/event_ringbuffer_test.go
index b155f850..6b390e69 100644
--- a/pkg/events/event_ringbuffer_test.go
+++ b/pkg/events/event_ringbuffer_test.go
@@ -192,6 +192,7 @@ func TestResize(t *testing.T) {
        assert.Equal(t, uint64(6), ringBuffer.capacity)
        assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID())
        assert.Equal(t, 6, len(ringBuffer.events))
+       assert.Equal(t, uint64(0), ringBuffer.resizeOffset)
 
        // Test case 2: Resize to a smaller size
        lastEventIdBeforeResize = ringBuffer.GetLastEventID()
@@ -199,6 +200,7 @@ func TestResize(t *testing.T) {
        assert.Equal(t, uint64(2), ringBuffer.capacity)
        assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID())
        assert.Equal(t, 2, len(ringBuffer.events))
+       assert.Equal(t, uint64(2), ringBuffer.resizeOffset)
 
        // Test case 3: Resize to a larger size
        lastEventIdBeforeResize = ringBuffer.GetLastEventID()
@@ -206,6 +208,7 @@ func TestResize(t *testing.T) {
        assert.Equal(t, uint64(20), ringBuffer.capacity)
        assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID())
        assert.Equal(t, 20, len(ringBuffer.events))
+       assert.Equal(t, uint64(2), ringBuffer.resizeOffset)
 
        // Test case 4: Resize when head is at the last element
        ringBuffer = newEventRingBuffer(5)
@@ -215,6 +218,7 @@ func TestResize(t *testing.T) {
        assert.Equal(t, uint64(2), ringBuffer.capacity)
        assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID())
        assert.Equal(t, 2, len(ringBuffer.events))
+       assert.Equal(t, uint64(2), ringBuffer.resizeOffset)
 
        // Test case 5: Resize to events length when head is at the last element
        ringBuffer = newEventRingBuffer(5)
@@ -225,7 +229,8 @@ func TestResize(t *testing.T) {
        assert.Equal(t, uint64(4), ringBuffer.capacity)
        assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID())
        assert.Equal(t, 4, len(ringBuffer.events))
-       assert.Equal(t, true, ringBuffer.full)
+       assert.Equal(t, uint64(0), ringBuffer.resizeOffset)
+       assert.Assert(t, ringBuffer.full)
 
        // Test case 6: Resize when the buffer is full
        ringBuffer = newEventRingBuffer(10)
@@ -237,6 +242,7 @@ func TestResize(t *testing.T) {
        assert.Equal(t, 6, len(ringBuffer.events))
        assert.Equal(t, uint64(0), ringBuffer.head)
        assert.Equal(t, true, ringBuffer.full)
+       assert.Equal(t, uint64(4), ringBuffer.resizeOffset)
 
        // Test case 7: Resize when the buffer is overflown (head is wrapped 
and position > 0)
        ringBuffer = newEventRingBuffer(10)
@@ -248,7 +254,8 @@ func TestResize(t *testing.T) {
        assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID())
        assert.Equal(t, 8, len(ringBuffer.events))
        assert.Equal(t, uint64(0), ringBuffer.head)
-       assert.Equal(t, true, ringBuffer.full)
+       assert.Equal(t, uint64(7), ringBuffer.resizeOffset)
+       assert.Assert(t, ringBuffer.full)
 
        // Test case 8: Test event full : Resize to lower size, followed by 
resize to a large size
        ringBuffer = newEventRingBuffer(10)
@@ -258,6 +265,7 @@ func TestResize(t *testing.T) {
        assert.Equal(t, true, ringBuffer.full)
        ringBuffer.Resize(6)
        assert.Equal(t, false, ringBuffer.full)
+       assert.Equal(t, uint64(7), ringBuffer.resizeOffset)
 
        // Test case 9: Test resize to same size
        lastEventIdBeforeResize = ringBuffer.GetLastEventID()
@@ -266,7 +274,7 @@ func TestResize(t *testing.T) {
        assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID())
        assert.Equal(t, 6, len(ringBuffer.events))
        assert.Equal(t, false, ringBuffer.full)
-
+       assert.Equal(t, uint64(7), ringBuffer.resizeOffset)
 }
 
 func populate(buffer *eventRingBuffer, count int) {
diff --git a/pkg/webservice/routes.go b/pkg/webservice/routes.go
index 632a6dbc..bbe1629c 100644
--- a/pkg/webservice/routes.go
+++ b/pkg/webservice/routes.go
@@ -185,7 +185,7 @@ var webRoutes = routes{
        route{
                "Scheduler",
                "GET",
-               "/ws/v1/events/batch/",
+               "/ws/v1/events/batch",
                getEvents,
        },
        // endpoint to retrieve CPU, Memory profiling data,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to