This is an automated email from the ASF dual-hosted git repository.

wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new b62164a2 [YUNIKORN-2132] Show active event streaming in the state dump 
(#707)
b62164a2 is described below

commit b62164a213bca1f488328bf67b9a08b4e8cbe508
Author: Peter Bacsko <[email protected]>
AuthorDate: Thu Feb 1 13:17:36 2024 +1100

    [YUNIKORN-2132] Show active event streaming in the state dump (#707)
    
    A list of active streams for events is added to the state dump.
    Creation time and originating host name are shown. No usage data is
    included.
    
    Closes: #707
    
    Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
 pkg/events/event_streaming.go        | 23 ++++++++++++++++++++++-
 pkg/events/event_streaming_test.go   | 20 ++++++++++++++++++++
 pkg/events/event_system.go           |  8 ++++++++
 pkg/events/event_system_test.go      | 13 +++++++++++++
 pkg/scheduler/objects/common_test.go |  4 ++++
 pkg/webservice/state_dump.go         |  3 +++
 6 files changed, 70 insertions(+), 1 deletion(-)

diff --git a/pkg/events/event_streaming.go b/pkg/events/event_streaming.go
index 4f7b9d26..3896e24d 100644
--- a/pkg/events/event_streaming.go
+++ b/pkg/events/event_streaming.go
@@ -36,7 +36,7 @@ type EventStreaming struct {
        buffer       *eventRingBuffer
        stopCh       chan struct{}
        eventStreams map[*EventStream]eventConsumerDetails
-       sync.Mutex
+       sync.RWMutex
 }
 
 type eventConsumerDetails struct {
@@ -47,6 +47,12 @@ type eventConsumerDetails struct {
        createdAt time.Time
 }
 
+// EventStreamData contains data about an event stream.
+type EventStreamData struct {
+       Name      string
+       CreatedAt time.Time
+}
+
 // EventStream handle type returned to the client that wants to capture the 
stream of events.
 type EventStream struct {
        Events <-chan *si.EventRecord
@@ -169,6 +175,21 @@ func (e *EventStreaming) Close() {
        close(e.stopCh)
 }
 
+// GetEventStreams returns the current active event streams.
+func (e *EventStreaming) GetEventStreams() []EventStreamData {
+       e.RLock()
+       defer e.RUnlock()
+       var streams []EventStreamData
+       for _, s := range e.eventStreams {
+               streams = append(streams, EventStreamData{
+                       Name:      s.name,
+                       CreatedAt: s.createdAt,
+               })
+       }
+
+       return streams
+}
+
 // NewEventStreaming creates a new event streaming infrastructure.
 func NewEventStreaming(eventBuffer *eventRingBuffer) *EventStreaming {
        return &EventStreaming{
diff --git a/pkg/events/event_streaming_test.go 
b/pkg/events/event_streaming_test.go
index 8afc770a..c559c424 100644
--- a/pkg/events/event_streaming_test.go
+++ b/pkg/events/event_streaming_test.go
@@ -134,6 +134,26 @@ func TestEventStreaming_SlowConsumer(t *testing.T) {
        assert.Equal(t, 0, len(streaming.eventStreams))
 }
 
+func TestGetEventStreams(t *testing.T) {
+       buffer := newEventRingBuffer(10)
+       streaming := NewEventStreaming(buffer)
+       defer streaming.Close()
+
+       streaming.CreateEventStream("test-1", 0)
+       streams := streaming.GetEventStreams()
+       assert.Equal(t, 1, len(streams))
+       assert.Equal(t, "test-1", streams[0].Name)
+
+       streaming.CreateEventStream("test-2", 0)
+       streams = streaming.GetEventStreams()
+       assert.Equal(t, 2, len(streams))
+       names := make(map[string]bool)
+       names[streams[0].Name] = true
+       names[streams[1].Name] = true
+       assert.Assert(t, names["test-2"])
+       assert.Assert(t, names["test-1"])
+}
+
 func receive(t *testing.T, input <-chan *si.EventRecord) *si.EventRecord {
        select {
        case event := <-input:
diff --git a/pkg/events/event_system.go b/pkg/events/event_system.go
index e89f6289..70ce2f26 100644
--- a/pkg/events/event_system.go
+++ b/pkg/events/event_system.go
@@ -73,6 +73,9 @@ type EventSystem interface {
        // Consumers that no longer wish to be updated (e.g., a remote client
        // disconnected) *must* call this method to gracefully stop the 
streaming.
        RemoveStream(*EventStream)
+
+       // GetEventStreams returns the current active event streams.
+       GetEventStreams() []EventStreamData
 }
 
 // EventSystemImpl main implementation of the event system which is used for 
history tracking.
@@ -254,6 +257,11 @@ func (ec *EventSystemImpl) Restart() {
        ec.StartServiceWithPublisher(true)
 }
 
+// GetEventStreams returns the current active event streams.
+func (ec *EventSystemImpl) GetEventStreams() []EventStreamData {
+       return ec.streaming.GetEventStreams()
+}
+
 // VisibleForTesting
 func (ec *EventSystemImpl) CloseAllStreams() {
        ec.streaming.Lock()
diff --git a/pkg/events/event_system_test.go b/pkg/events/event_system_test.go
index 0864f3d7..f00961e2 100644
--- a/pkg/events/event_system_test.go
+++ b/pkg/events/event_system_test.go
@@ -141,3 +141,16 @@ func TestConfigUpdate(t *testing.T) {
        assert.Equal(t, eventSystem.GetRequestCapacity(), newRequestCapacity)
        assert.Equal(t, eventSystem.eventBuffer.capacity, newRingBufferCapacity)
 }
+
+func TestEventStreaming(t *testing.T) {
+       Init()
+       eventSystem := GetEventSystem()
+       eventSystem.StartService()
+       defer eventSystem.Stop()
+
+       eventSystem.CreateEventStream("test", 10)
+       streams := eventSystem.GetEventStreams()
+
+       assert.Equal(t, 1, len(streams))
+       assert.Equal(t, "test", streams[0].Name)
+}
diff --git a/pkg/scheduler/objects/common_test.go 
b/pkg/scheduler/objects/common_test.go
index 290a32fa..52dd536f 100644
--- a/pkg/scheduler/objects/common_test.go
+++ b/pkg/scheduler/objects/common_test.go
@@ -56,6 +56,10 @@ func (m *EventSystemMock) IsEventTrackingEnabled() bool {
        return m.enabled
 }
 
+func (m *EventSystemMock) GetEventStreams() []events.EventStreamData {
+       return nil
+}
+
 func newEventSystemMock() *EventSystemMock {
        return &EventSystemMock{events: make([]*si.EventRecord, 0), enabled: 
true}
 }
diff --git a/pkg/webservice/state_dump.go b/pkg/webservice/state_dump.go
index c607d175..8aeb5c34 100644
--- a/pkg/webservice/state_dump.go
+++ b/pkg/webservice/state_dump.go
@@ -26,6 +26,7 @@ import (
        "sync"
        "time"
 
+       "github.com/apache/yunikorn-core/pkg/events"
        yunikornLog "github.com/apache/yunikorn-core/pkg/log"
        "github.com/apache/yunikorn-core/pkg/webservice/dao"
 )
@@ -48,6 +49,7 @@ type AggregatedStateInfo struct {
        RMDiagnostics    map[string]interface{}           
`json:"rmDiagnostics,omitempty"`
        LogLevel         string                           
`json:"logLevel,omitempty"`
        Config           *dao.ConfigDAOInfo               
`json:"config,omitempty"`
+       EventStreams     []events.EventStreamData         
`json:"eventStreams,omitempty"`
 }
 
 func getFullStateDump(w http.ResponseWriter, r *http.Request) {
@@ -82,6 +84,7 @@ func doStateDump(w io.Writer) error {
                RMDiagnostics:    getResourceManagerDiagnostics(),
                LogLevel:         zapConfig.Level.Level().String(),
                Config:           getClusterConfigDAO(),
+               EventStreams:     events.GetEventSystem().GetEventStreams(),
        }
 
        var prettyJSON []byte


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to