This is an automated email from the ASF dual-hosted git repository.
zhuqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 433d5462 [YUNIKORN-2118] Add smoke test for event streaming (#705)
433d5462 is described below
commit 433d5462453276f983f852c21b99442c95394031
Author: Peter Bacsko <[email protected]>
AuthorDate: Fri Feb 2 12:02:36 2024 +0800
[YUNIKORN-2118] Add smoke test for event streaming (#705)
Code review
Closes: #705
Signed-off-by: qzhu <[email protected]>
---
pkg/scheduler/tests/application_tracking_test.go | 111 ++++++++++++++++++---
.../tests/{restclient.go => restclient_test.go} | 19 +++-
2 files changed, 114 insertions(+), 16 deletions(-)
diff --git a/pkg/scheduler/tests/application_tracking_test.go
b/pkg/scheduler/tests/application_tracking_test.go
index 359b0043..9f38b2d5 100644
--- a/pkg/scheduler/tests/application_tracking_test.go
+++ b/pkg/scheduler/tests/application_tracking_test.go
@@ -19,6 +19,11 @@
package tests
import (
+ "bufio"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
"testing"
"time"
@@ -47,10 +52,16 @@ func TestApplicationHistoryTracking(t *testing.T) {
// Check queue events
client := RClient{}
- events, err := client.GetEvents()
+ stream, err := client.GetEventsStream(1000)
assert.NilError(t, err)
- assert.Equal(t, 2, len(events.EventRecords), "number of events
generated")
- verifyQueueEvents(t, events.EventRecords)
+ defer stream.Close()
+ eventsDao, err := client.GetBatchEvents()
+ assert.NilError(t, err)
+ assert.Equal(t, 2, len(eventsDao.EventRecords), "number of events
generated")
+ verifyQueueEvents(t, eventsDao.EventRecords)
+ events := getEventsFromStream(t, stream, 2)
+ assert.NilError(t, err)
+ verifyQueueEvents(t, events)
// Register a node & check events
err = ms.proxy.UpdateNode(&si.NodeRequest{
@@ -71,10 +82,13 @@ func TestApplicationHistoryTracking(t *testing.T) {
})
assert.NilError(t, err, "NodeRequest failed")
ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
- events, err = client.GetEvents()
+ eventsDao, err = client.GetBatchEvents()
+ assert.NilError(t, err)
+ assert.Equal(t, 5, len(eventsDao.EventRecords), "number of events
generated")
+ verifyNodeAddedAndQueueMaxSetEvents(t, eventsDao.EventRecords[2:])
+ events = getEventsFromStream(t, stream, 3)
assert.NilError(t, err)
- assert.Equal(t, 5, len(events.EventRecords), "number of events
generated")
- verifyNodeAddedAndQueueMaxSetEvents(t, events.EventRecords[2:])
+ verifyNodeAddedAndQueueMaxSetEvents(t, events)
// Add application & check events
err = ms.proxy.UpdateApplication(&si.ApplicationRequest{
@@ -83,10 +97,13 @@ func TestApplicationHistoryTracking(t *testing.T) {
})
assert.NilError(t, err, "ApplicationRequest failed")
ms.mockRM.waitForAcceptedApplication(t, appID1, 1000)
- events, err = client.GetEvents()
+ eventsDao, err = client.GetBatchEvents()
assert.NilError(t, err)
- assert.Equal(t, 7, len(events.EventRecords), "number of events
generated")
- verifyAppAddedEvents(t, events.EventRecords[5:])
+ assert.Equal(t, 7, len(eventsDao.EventRecords), "number of events
generated")
+ verifyAppAddedEvents(t, eventsDao.EventRecords[5:])
+ events = getEventsFromStream(t, stream, 2)
+ assert.NilError(t, err)
+ verifyAppAddedEvents(t, events)
// Add allocation ask & check events
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
@@ -107,10 +124,12 @@ func TestApplicationHistoryTracking(t *testing.T) {
})
assert.NilError(t, err, "AllocationRequest failed")
ms.mockRM.waitForAllocations(t, 1, 1000)
- events, err = client.GetEvents()
+ eventsDao, err = client.GetBatchEvents()
assert.NilError(t, err)
- assert.Equal(t, 12, len(events.EventRecords), "number of events
generated")
- verifyAllocationAskAddedEvents(t, events.EventRecords[7:])
+ assert.Equal(t, 12, len(eventsDao.EventRecords), "number of events
generated")
+ verifyAllocationAskAddedEvents(t, eventsDao.EventRecords[7:])
+ events = getEventsFromStream(t, stream, 5)
+ verifyAllocationAskAddedEvents(t, events)
allocations := ms.mockRM.getAllocations()
assert.Equal(t, 1, len(allocations), "number of allocations")
@@ -142,10 +161,72 @@ func TestApplicationHistoryTracking(t *testing.T) {
})
assert.NilError(t, err, "timeout waiting for app state Completing")
- events, err = client.GetEvents()
+ eventsDao, err = client.GetBatchEvents()
+ assert.NilError(t, err)
+ assert.Equal(t, 15, len(eventsDao.EventRecords), "number of events
generated")
+ verifyAllocationCancelledEvents(t, eventsDao.EventRecords[12:])
+ events = getEventsFromStream(t, stream, 3)
+ assert.NilError(t, err)
+ verifyAllocationCancelledEvents(t, events)
+}
+
+func getEventsFromStream(t *testing.T, stream io.ReadCloser, numEvents int)
[]*si.EventRecord {
+ lines, err := readLinesFromStream(stream, numEvents)
+ assert.NilError(t, err)
+
+ events, err := siEventFromJson(lines)
assert.NilError(t, err)
- assert.Equal(t, 15, len(events.EventRecords), "number of events
generated")
- verifyAllocationCancelledEvents(t, events.EventRecords[12:])
+
+ return events
+}
+
+type scanResult struct {
+ lines []string
+ err error
+}
+
+func readLinesFromStream(stream io.ReadCloser, numLines int) ([]string, error)
{
+ resp := make(chan scanResult)
+
+ go func() {
+ reader := bufio.NewReader(stream)
+ lines := make([]string, 0)
+ scanner := bufio.NewScanner(reader)
+ scanner.Split(bufio.ScanLines)
+
+ for i := 0; i < numLines; i++ {
+ if !scanner.Scan() {
+ resp <- scanResult{nil, scanner.Err()}
+ return
+ }
+ line := scanner.Text()
+ fmt.Println("Event stream - received:", line)
+ lines = append(lines, line)
+ }
+ resp <- scanResult{lines, nil}
+ }()
+
+ select {
+ case <-time.After(time.Second):
+ return nil, errors.New("timeout waiting for events on the event
stream")
+ case r := <-resp:
+ return r.lines, r.err
+ }
+}
+
+func siEventFromJson(lines []string) ([]*si.EventRecord, error) {
+ var events []*si.EventRecord
+
+ for _, line := range lines {
+ var evt si.EventRecord
+ err := json.Unmarshal([]byte(line), &evt)
+ if err != nil {
+ return nil, err
+ }
+ events = append(events, &evt)
+ }
+
+ return events, nil
}
func verifyQueueEvents(t *testing.T, events []*si.EventRecord) {
diff --git a/pkg/scheduler/tests/restclient.go
b/pkg/scheduler/tests/restclient_test.go
similarity index 76%
rename from pkg/scheduler/tests/restclient.go
rename to pkg/scheduler/tests/restclient_test.go
index 9ff09698..36240797 100644
--- a/pkg/scheduler/tests/restclient.go
+++ b/pkg/scheduler/tests/restclient_test.go
@@ -23,6 +23,7 @@ import (
"io"
"net/http"
"net/url"
+ "strconv"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
)
@@ -31,7 +32,8 @@ type RClient struct {
BaseURL *url.URL
}
-func (c *RClient) GetEvents() (*dao.EventRecordDAO, error) {
+// GetBatchEvents returns history events from the batch interface
+func (c *RClient) GetBatchEvents() (*dao.EventRecordDAO, error) {
req, err := c.newRequest("GET", "ws/v1/events/batch")
if err != nil {
return nil, err
@@ -41,6 +43,21 @@ func (c *RClient) GetEvents() (*dao.EventRecordDAO, error) {
return events, err
}
+// GetEventsStream returns a persistent connection with a stream of events
+func (c *RClient) GetEventsStream(count uint64) (io.ReadCloser, error) {
+ req, err := c.newRequest("GET", "ws/v1/events/stream")
+ if err != nil {
+ return nil, err
+ }
+ req.URL.RawQuery = "count=" + strconv.FormatUint(count, 10)
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return nil, err
+ }
+
+ return resp.Body, nil
+}
+
func (c *RClient) newRequest(method, path string) (*http.Request, error) {
rel := &url.URL{Path: path}
wsUrl := &url.URL{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]