This is an automated email from the ASF dual-hosted git repository.
wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new b62164a2 [YUNIKORN-2132] Show active event streaming in the state dump
(#707)
b62164a2 is described below
commit b62164a213bca1f488328bf67b9a08b4e8cbe508
Author: Peter Bacsko <[email protected]>
AuthorDate: Thu Feb 1 13:17:36 2024 +1100
[YUNIKORN-2132] Show active event streaming in the state dump (#707)
A list of active streams for events is added to the state dump.
Creation time and originating host name are shown. No usage data is
included.
Closes: #707
Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
pkg/events/event_streaming.go | 23 ++++++++++++++++++++++-
pkg/events/event_streaming_test.go | 20 ++++++++++++++++++++
pkg/events/event_system.go | 8 ++++++++
pkg/events/event_system_test.go | 13 +++++++++++++
pkg/scheduler/objects/common_test.go | 4 ++++
pkg/webservice/state_dump.go | 3 +++
6 files changed, 70 insertions(+), 1 deletion(-)
diff --git a/pkg/events/event_streaming.go b/pkg/events/event_streaming.go
index 4f7b9d26..3896e24d 100644
--- a/pkg/events/event_streaming.go
+++ b/pkg/events/event_streaming.go
@@ -36,7 +36,7 @@ type EventStreaming struct {
buffer *eventRingBuffer
stopCh chan struct{}
eventStreams map[*EventStream]eventConsumerDetails
- sync.Mutex
+ sync.RWMutex
}
type eventConsumerDetails struct {
@@ -47,6 +47,12 @@ type eventConsumerDetails struct {
createdAt time.Time
}
+// EventStreamData contains data about an event stream.
+type EventStreamData struct {
+ Name string
+ CreatedAt time.Time
+}
+
// EventStream handle type returned to the client that wants to capture the
stream of events.
type EventStream struct {
Events <-chan *si.EventRecord
@@ -169,6 +175,21 @@ func (e *EventStreaming) Close() {
close(e.stopCh)
}
+// GetEventStreams returns the current active event streams.
+func (e *EventStreaming) GetEventStreams() []EventStreamData {
+ e.RLock()
+ defer e.RUnlock()
+ var streams []EventStreamData
+ for _, s := range e.eventStreams {
+ streams = append(streams, EventStreamData{
+ Name: s.name,
+ CreatedAt: s.createdAt,
+ })
+ }
+
+ return streams
+}
+
// NewEventStreaming creates a new event streaming infrastructure.
func NewEventStreaming(eventBuffer *eventRingBuffer) *EventStreaming {
return &EventStreaming{
diff --git a/pkg/events/event_streaming_test.go
b/pkg/events/event_streaming_test.go
index 8afc770a..c559c424 100644
--- a/pkg/events/event_streaming_test.go
+++ b/pkg/events/event_streaming_test.go
@@ -134,6 +134,26 @@ func TestEventStreaming_SlowConsumer(t *testing.T) {
assert.Equal(t, 0, len(streaming.eventStreams))
}
+func TestGetEventStreams(t *testing.T) {
+ buffer := newEventRingBuffer(10)
+ streaming := NewEventStreaming(buffer)
+ defer streaming.Close()
+
+ streaming.CreateEventStream("test-1", 0)
+ streams := streaming.GetEventStreams()
+ assert.Equal(t, 1, len(streams))
+ assert.Equal(t, "test-1", streams[0].Name)
+
+ streaming.CreateEventStream("test-2", 0)
+ streams = streaming.GetEventStreams()
+ assert.Equal(t, 2, len(streams))
+ names := make(map[string]bool)
+ names[streams[0].Name] = true
+ names[streams[1].Name] = true
+ assert.Assert(t, names["test-2"])
+ assert.Assert(t, names["test-1"])
+}
+
func receive(t *testing.T, input <-chan *si.EventRecord) *si.EventRecord {
select {
case event := <-input:
diff --git a/pkg/events/event_system.go b/pkg/events/event_system.go
index e89f6289..70ce2f26 100644
--- a/pkg/events/event_system.go
+++ b/pkg/events/event_system.go
@@ -73,6 +73,9 @@ type EventSystem interface {
// Consumers that no longer wish to be updated (e.g., a remote client
// disconnected) *must* call this method to gracefully stop the
streaming.
RemoveStream(*EventStream)
+
+ // GetEventStreams returns the current active event streams.
+ GetEventStreams() []EventStreamData
}
// EventSystemImpl main implementation of the event system which is used for
history tracking.
@@ -254,6 +257,11 @@ func (ec *EventSystemImpl) Restart() {
ec.StartServiceWithPublisher(true)
}
+// GetEventStreams returns the current active event streams.
+func (ec *EventSystemImpl) GetEventStreams() []EventStreamData {
+ return ec.streaming.GetEventStreams()
+}
+
// VisibleForTesting
func (ec *EventSystemImpl) CloseAllStreams() {
ec.streaming.Lock()
diff --git a/pkg/events/event_system_test.go b/pkg/events/event_system_test.go
index 0864f3d7..f00961e2 100644
--- a/pkg/events/event_system_test.go
+++ b/pkg/events/event_system_test.go
@@ -141,3 +141,16 @@ func TestConfigUpdate(t *testing.T) {
assert.Equal(t, eventSystem.GetRequestCapacity(), newRequestCapacity)
assert.Equal(t, eventSystem.eventBuffer.capacity, newRingBufferCapacity)
}
+
+func TestEventStreaming(t *testing.T) {
+ Init()
+ eventSystem := GetEventSystem()
+ eventSystem.StartService()
+ defer eventSystem.Stop()
+
+ eventSystem.CreateEventStream("test", 10)
+ streams := eventSystem.GetEventStreams()
+
+ assert.Equal(t, 1, len(streams))
+ assert.Equal(t, "test", streams[0].Name)
+}
diff --git a/pkg/scheduler/objects/common_test.go
b/pkg/scheduler/objects/common_test.go
index 290a32fa..52dd536f 100644
--- a/pkg/scheduler/objects/common_test.go
+++ b/pkg/scheduler/objects/common_test.go
@@ -56,6 +56,10 @@ func (m *EventSystemMock) IsEventTrackingEnabled() bool {
return m.enabled
}
+func (m *EventSystemMock) GetEventStreams() []events.EventStreamData {
+ return nil
+}
+
func newEventSystemMock() *EventSystemMock {
return &EventSystemMock{events: make([]*si.EventRecord, 0), enabled:
true}
}
diff --git a/pkg/webservice/state_dump.go b/pkg/webservice/state_dump.go
index c607d175..8aeb5c34 100644
--- a/pkg/webservice/state_dump.go
+++ b/pkg/webservice/state_dump.go
@@ -26,6 +26,7 @@ import (
"sync"
"time"
+ "github.com/apache/yunikorn-core/pkg/events"
yunikornLog "github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
)
@@ -48,6 +49,7 @@ type AggregatedStateInfo struct {
RMDiagnostics map[string]interface{}
`json:"rmDiagnostics,omitempty"`
LogLevel string
`json:"logLevel,omitempty"`
Config *dao.ConfigDAOInfo
`json:"config,omitempty"`
+ EventStreams []events.EventStreamData
`json:"eventStreams,omitempty"`
}
func getFullStateDump(w http.ResponseWriter, r *http.Request) {
@@ -82,6 +84,7 @@ func doStateDump(w io.Writer) error {
RMDiagnostics: getResourceManagerDiagnostics(),
LogLevel: zapConfig.Level.Level().String(),
Config: getClusterConfigDAO(),
+ EventStreams: events.GetEventSystem().GetEventStreams(),
}
var prettyJSON []byte
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]