This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new b9899e2a [YUNIKORN-2409] History tracking minor code cleanup (#797)
b9899e2a is described below

commit b9899e2ad9df5414b007de4829d01035d95e088b
Author: Peter Bacsko <[email protected]>
AuthorDate: Mon Feb 12 17:14:35 2024 +0100

    [YUNIKORN-2409] History tracking minor code cleanup (#797)
    
    Small cleanup in various parts of app history tracking.
    
    Closes: #797
    
    Signed-off-by: Peter Bacsko <[email protected]>
---
 pkg/events/event_ringbuffer.go              | 2 --
 pkg/events/event_streaming.go               | 5 ++---
 pkg/scheduler/objects/application_events.go | 5 -----
 pkg/webservice/handlers.go                  | 4 +---
 4 files changed, 3 insertions(+), 13 deletions(-)

diff --git a/pkg/events/event_ringbuffer.go b/pkg/events/event_ringbuffer.go
index a4308645..4ddea827 100644
--- a/pkg/events/event_ringbuffer.go
+++ b/pkg/events/event_ringbuffer.go
@@ -102,8 +102,6 @@ func (e *eventRingBuffer) GetEventsFromID(id uint64, count 
uint64) ([]*si.EventR
 
 // getEventsFromID unlocked version of GetEventsFromID
 func (e *eventRingBuffer) getEventsFromID(id uint64, count uint64) 
([]*si.EventRecord, uint64, uint64) {
-       e.RLock()
-       defer e.RUnlock()
        lowest := e.getLowestID()
 
        pos, idFound := e.id2pos(id)
diff --git a/pkg/events/event_streaming.go b/pkg/events/event_streaming.go
index 3896e24d..a43d1fa5 100644
--- a/pkg/events/event_streaming.go
+++ b/pkg/events/event_streaming.go
@@ -94,7 +94,7 @@ func (e *EventStreaming) CreateEventStream(name string, count 
uint64) *EventStre
        }
        local := make(chan *si.EventRecord, defaultChannelBufSize)
        stop := make(chan struct{})
-       e.createEventStreamInternal(stream, local, consumer, stop, name, count)
+       e.createEventStreamInternal(stream, local, consumer, stop, name)
        history := e.buffer.GetRecentEvents(count)
 
        go func(consumer chan<- *si.EventRecord, local <-chan *si.EventRecord, 
stop <-chan struct{}) {
@@ -137,8 +137,7 @@ func (e *EventStreaming) createEventStreamInternal(stream 
*EventStream,
        local chan *si.EventRecord,
        consumer chan *si.EventRecord,
        stop chan struct{},
-       name string,
-       count uint64) {
+       name string) {
        // stuff that needs locking
        e.Lock()
        defer e.Unlock()
diff --git a/pkg/scheduler/objects/application_events.go 
b/pkg/scheduler/objects/application_events.go
index 87b0a7b3..58a24ab9 100644
--- a/pkg/scheduler/objects/application_events.go
+++ b/pkg/scheduler/objects/application_events.go
@@ -20,9 +20,6 @@ package objects
 
 import (
        "fmt"
-       "time"
-
-       "golang.org/x/time/rate"
 
        "github.com/apache/yunikorn-core/pkg/common"
        "github.com/apache/yunikorn-core/pkg/events"
@@ -32,7 +29,6 @@ import (
 type applicationEvents struct {
        eventSystem events.EventSystem
        app         *Application
-       limiter     *rate.Limiter
 }
 
 func (evt *applicationEvents) sendPlaceholderLargerEvent(ph *Allocation, 
request *AllocationAsk) {
@@ -119,6 +115,5 @@ func newApplicationEvents(app *Application, evt 
events.EventSystem) *application
        return &applicationEvents{
                eventSystem: evt,
                app:         app,
-               limiter:     rate.NewLimiter(rate.Every(time.Second), 1),
        }
 }
diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go
index 6f4f7105..cbd62c82 100644
--- a/pkg/webservice/handlers.go
+++ b/pkg/webservice/handlers.go
@@ -1140,6 +1140,7 @@ func getStream(w http.ResponseWriter, r *http.Request) {
        }
        enc := json.NewEncoder(w)
        stream := eventSystem.CreateEventStream(r.Host, count)
+       defer eventSystem.RemoveStream(stream)
 
        // Reading events in an infinite loop until either the client 
disconnects or Yunikorn closes the channel.
        // This results in a persistent HTTP connection where the message body 
is never closed.
@@ -1149,7 +1150,6 @@ func getStream(w http.ResponseWriter, r *http.Request) {
                case <-r.Context().Done():
                        log.Log(log.REST).Info("Connection closed for event 
stream client",
                                zap.String("host", r.Host))
-                       eventSystem.RemoveStream(stream)
                        return
                case e, ok := <-stream.Events:
                        err := rc.SetWriteDeadline(time.Now().Add(5 * 
time.Second))
@@ -1157,7 +1157,6 @@ func getStream(w http.ResponseWriter, r *http.Request) {
                                // should not fail at this point
                                log.Log(log.REST).Error("Cannot set write 
deadline", zap.Error(err))
                                buildJSONErrorResponse(w, fmt.Sprintf("Cannot 
set write deadline: %v", err), http.StatusInternalServerError)
-                               eventSystem.RemoveStream(stream)
                                return
                        }
 
@@ -1173,7 +1172,6 @@ func getStream(w http.ResponseWriter, r *http.Request) {
                                log.Log(log.REST).Error("Marshalling error",
                                        zap.String("host", r.Host))
                                buildJSONErrorResponse(w, err.Error(), 
http.StatusOK) // status code is 200 at this point, cannot be changed
-                               eventSystem.RemoveStream(stream)
                                return
                        }
                        f.Flush()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to