This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new b9899e2a [YUNIKORN-2409] History tracking minor code cleanup (#797)
b9899e2a is described below
commit b9899e2ad9df5414b007de4829d01035d95e088b
Author: Peter Bacsko <[email protected]>
AuthorDate: Mon Feb 12 17:14:35 2024 +0100
[YUNIKORN-2409] History tracking minor code cleanup (#797)
Small cleanup in various parts of app history tracking.
Closes: #797
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/events/event_ringbuffer.go | 2 --
pkg/events/event_streaming.go | 5 ++---
pkg/scheduler/objects/application_events.go | 5 -----
pkg/webservice/handlers.go | 4 +---
4 files changed, 3 insertions(+), 13 deletions(-)
diff --git a/pkg/events/event_ringbuffer.go b/pkg/events/event_ringbuffer.go
index a4308645..4ddea827 100644
--- a/pkg/events/event_ringbuffer.go
+++ b/pkg/events/event_ringbuffer.go
@@ -102,8 +102,6 @@ func (e *eventRingBuffer) GetEventsFromID(id uint64, count
uint64) ([]*si.EventR
// getEventsFromID unlocked version of GetEventsFromID
func (e *eventRingBuffer) getEventsFromID(id uint64, count uint64)
([]*si.EventRecord, uint64, uint64) {
- e.RLock()
- defer e.RUnlock()
lowest := e.getLowestID()
pos, idFound := e.id2pos(id)
diff --git a/pkg/events/event_streaming.go b/pkg/events/event_streaming.go
index 3896e24d..a43d1fa5 100644
--- a/pkg/events/event_streaming.go
+++ b/pkg/events/event_streaming.go
@@ -94,7 +94,7 @@ func (e *EventStreaming) CreateEventStream(name string, count
uint64) *EventStre
}
local := make(chan *si.EventRecord, defaultChannelBufSize)
stop := make(chan struct{})
- e.createEventStreamInternal(stream, local, consumer, stop, name, count)
+ e.createEventStreamInternal(stream, local, consumer, stop, name)
history := e.buffer.GetRecentEvents(count)
go func(consumer chan<- *si.EventRecord, local <-chan *si.EventRecord,
stop <-chan struct{}) {
@@ -137,8 +137,7 @@ func (e *EventStreaming) createEventStreamInternal(stream
*EventStream,
local chan *si.EventRecord,
consumer chan *si.EventRecord,
stop chan struct{},
- name string,
- count uint64) {
+ name string) {
// stuff that needs locking
e.Lock()
defer e.Unlock()
diff --git a/pkg/scheduler/objects/application_events.go
b/pkg/scheduler/objects/application_events.go
index 87b0a7b3..58a24ab9 100644
--- a/pkg/scheduler/objects/application_events.go
+++ b/pkg/scheduler/objects/application_events.go
@@ -20,9 +20,6 @@ package objects
import (
"fmt"
- "time"
-
- "golang.org/x/time/rate"
"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/events"
@@ -32,7 +29,6 @@ import (
type applicationEvents struct {
eventSystem events.EventSystem
app *Application
- limiter *rate.Limiter
}
func (evt *applicationEvents) sendPlaceholderLargerEvent(ph *Allocation,
request *AllocationAsk) {
@@ -119,6 +115,5 @@ func newApplicationEvents(app *Application, evt
events.EventSystem) *application
return &applicationEvents{
eventSystem: evt,
app: app,
- limiter: rate.NewLimiter(rate.Every(time.Second), 1),
}
}
diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go
index 6f4f7105..cbd62c82 100644
--- a/pkg/webservice/handlers.go
+++ b/pkg/webservice/handlers.go
@@ -1140,6 +1140,7 @@ func getStream(w http.ResponseWriter, r *http.Request) {
}
enc := json.NewEncoder(w)
stream := eventSystem.CreateEventStream(r.Host, count)
+ defer eventSystem.RemoveStream(stream)
// Reading events in an infinite loop until either the client
disconnects or Yunikorn closes the channel.
// This results in a persistent HTTP connection where the message body
is never closed.
@@ -1149,7 +1150,6 @@ func getStream(w http.ResponseWriter, r *http.Request) {
case <-r.Context().Done():
log.Log(log.REST).Info("Connection closed for event
stream client",
zap.String("host", r.Host))
- eventSystem.RemoveStream(stream)
return
case e, ok := <-stream.Events:
err := rc.SetWriteDeadline(time.Now().Add(5 *
time.Second))
@@ -1157,7 +1157,6 @@ func getStream(w http.ResponseWriter, r *http.Request) {
// should not fail at this point
log.Log(log.REST).Error("Cannot set write
deadline", zap.Error(err))
buildJSONErrorResponse(w, fmt.Sprintf("Cannot
set write deadline: %v", err), http.StatusInternalServerError)
- eventSystem.RemoveStream(stream)
return
}
@@ -1173,7 +1172,6 @@ func getStream(w http.ResponseWriter, r *http.Request) {
log.Log(log.REST).Error("Marshalling error",
zap.String("host", r.Host))
buildJSONErrorResponse(w, err.Error(),
http.StatusOK) // status code is 200 at this point, cannot be changed
- eventSystem.RemoveStream(stream)
return
}
f.Flush()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]