This is an automated email from the ASF dual-hosted git repository.

zhuqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 433d5462 [YUNIKORN-2118] Add smoke test for event streaming (#705)
433d5462 is described below

commit 433d5462453276f983f852c21b99442c95394031
Author: Peter Bacsko <[email protected]>
AuthorDate: Fri Feb 2 12:02:36 2024 +0800

    [YUNIKORN-2118] Add smoke test for event streaming (#705)
    
    Code review
    
    Closes: #705
    
    Signed-off-by: qzhu <[email protected]>
---
 pkg/scheduler/tests/application_tracking_test.go   | 111 ++++++++++++++++++---
 .../tests/{restclient.go => restclient_test.go}    |  19 +++-
 2 files changed, 114 insertions(+), 16 deletions(-)

diff --git a/pkg/scheduler/tests/application_tracking_test.go 
b/pkg/scheduler/tests/application_tracking_test.go
index 359b0043..9f38b2d5 100644
--- a/pkg/scheduler/tests/application_tracking_test.go
+++ b/pkg/scheduler/tests/application_tracking_test.go
@@ -19,6 +19,11 @@
 package tests
 
 import (
+       "bufio"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "io"
        "testing"
        "time"
 
@@ -47,10 +52,16 @@ func TestApplicationHistoryTracking(t *testing.T) {
 
        // Check queue events
        client := RClient{}
-       events, err := client.GetEvents()
+       stream, err := client.GetEventsStream(1000)
        assert.NilError(t, err)
-       assert.Equal(t, 2, len(events.EventRecords), "number of events 
generated")
-       verifyQueueEvents(t, events.EventRecords)
+       defer stream.Close()
+       eventsDao, err := client.GetBatchEvents()
+       assert.NilError(t, err)
+       assert.Equal(t, 2, len(eventsDao.EventRecords), "number of events 
generated")
+       verifyQueueEvents(t, eventsDao.EventRecords)
+       events := getEventsFromStream(t, stream, 2)
+       assert.NilError(t, err)
+       verifyQueueEvents(t, events)
 
        // Register a node & check events
        err = ms.proxy.UpdateNode(&si.NodeRequest{
@@ -71,10 +82,13 @@ func TestApplicationHistoryTracking(t *testing.T) {
        })
        assert.NilError(t, err, "NodeRequest failed")
        ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
-       events, err = client.GetEvents()
+       eventsDao, err = client.GetBatchEvents()
+       assert.NilError(t, err)
+       assert.Equal(t, 5, len(eventsDao.EventRecords), "number of events 
generated")
+       verifyNodeAddedAndQueueMaxSetEvents(t, eventsDao.EventRecords[2:])
+       events = getEventsFromStream(t, stream, 3)
        assert.NilError(t, err)
-       assert.Equal(t, 5, len(events.EventRecords), "number of events 
generated")
-       verifyNodeAddedAndQueueMaxSetEvents(t, events.EventRecords[2:])
+       verifyNodeAddedAndQueueMaxSetEvents(t, events)
 
        // Add application & check events
        err = ms.proxy.UpdateApplication(&si.ApplicationRequest{
@@ -83,10 +97,13 @@ func TestApplicationHistoryTracking(t *testing.T) {
        })
        assert.NilError(t, err, "ApplicationRequest failed")
        ms.mockRM.waitForAcceptedApplication(t, appID1, 1000)
-       events, err = client.GetEvents()
+       eventsDao, err = client.GetBatchEvents()
        assert.NilError(t, err)
-       assert.Equal(t, 7, len(events.EventRecords), "number of events 
generated")
-       verifyAppAddedEvents(t, events.EventRecords[5:])
+       assert.Equal(t, 7, len(eventsDao.EventRecords), "number of events 
generated")
+       verifyAppAddedEvents(t, eventsDao.EventRecords[5:])
+       events = getEventsFromStream(t, stream, 2)
+       assert.NilError(t, err)
+       verifyAppAddedEvents(t, events)
 
        // Add allocation ask & check events
        err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
@@ -107,10 +124,12 @@ func TestApplicationHistoryTracking(t *testing.T) {
        })
        assert.NilError(t, err, "AllocationRequest failed")
        ms.mockRM.waitForAllocations(t, 1, 1000)
-       events, err = client.GetEvents()
+       eventsDao, err = client.GetBatchEvents()
        assert.NilError(t, err)
-       assert.Equal(t, 12, len(events.EventRecords), "number of events 
generated")
-       verifyAllocationAskAddedEvents(t, events.EventRecords[7:])
+       assert.Equal(t, 12, len(eventsDao.EventRecords), "number of events 
generated")
+       verifyAllocationAskAddedEvents(t, eventsDao.EventRecords[7:])
+       events = getEventsFromStream(t, stream, 5)
+       verifyAllocationAskAddedEvents(t, events)
 
        allocations := ms.mockRM.getAllocations()
        assert.Equal(t, 1, len(allocations), "number of allocations")
@@ -142,10 +161,72 @@ func TestApplicationHistoryTracking(t *testing.T) {
        })
        assert.NilError(t, err, "timeout waiting for app state Completing")
 
-       events, err = client.GetEvents()
+       eventsDao, err = client.GetBatchEvents()
+       assert.NilError(t, err)
+       assert.Equal(t, 15, len(eventsDao.EventRecords), "number of events 
generated")
+       verifyAllocationCancelledEvents(t, eventsDao.EventRecords[12:])
+       events = getEventsFromStream(t, stream, 3)
+       assert.NilError(t, err)
+       verifyAllocationCancelledEvents(t, events)
+}
+
+func getEventsFromStream(t *testing.T, stream io.ReadCloser, numEvents int) 
[]*si.EventRecord {
+       lines, err := readLinesFromStream(stream, numEvents)
+       assert.NilError(t, err)
+
+       events, err := siEventFromJson(lines)
        assert.NilError(t, err)
-       assert.Equal(t, 15, len(events.EventRecords), "number of events 
generated")
-       verifyAllocationCancelledEvents(t, events.EventRecords[12:])
+
+       return events
+}
+
+type scanResult struct {
+       lines []string
+       err   error
+}
+
+func readLinesFromStream(stream io.ReadCloser, numLines int) ([]string, error) 
{
+       resp := make(chan scanResult)
+
+       go func() {
+               reader := bufio.NewReader(stream)
+               lines := make([]string, 0)
+               scanner := bufio.NewScanner(reader)
+               scanner.Split(bufio.ScanLines)
+
+               for i := 0; i < numLines; i++ {
+                       if !scanner.Scan() {
+                               resp <- scanResult{nil, scanner.Err()}
+                               return
+                       }
+                       line := scanner.Text()
+                       fmt.Println("Event stream - received:", line)
+                       lines = append(lines, line)
+               }
+               resp <- scanResult{lines, nil}
+       }()
+
+       select {
+       case <-time.After(time.Second):
+               return nil, errors.New("timeout waiting for events on the event 
stream")
+       case r := <-resp:
+               return r.lines, r.err
+       }
+}
+
+func siEventFromJson(lines []string) ([]*si.EventRecord, error) {
+       var events []*si.EventRecord
+
+       for _, line := range lines {
+               var evt si.EventRecord
+               err := json.Unmarshal([]byte(line), &evt)
+               if err != nil {
+                       return nil, err
+               }
+               events = append(events, &evt)
+       }
+
+       return events, nil
 }
 
 func verifyQueueEvents(t *testing.T, events []*si.EventRecord) {
diff --git a/pkg/scheduler/tests/restclient.go 
b/pkg/scheduler/tests/restclient_test.go
similarity index 76%
rename from pkg/scheduler/tests/restclient.go
rename to pkg/scheduler/tests/restclient_test.go
index 9ff09698..36240797 100644
--- a/pkg/scheduler/tests/restclient.go
+++ b/pkg/scheduler/tests/restclient_test.go
@@ -23,6 +23,7 @@ import (
        "io"
        "net/http"
        "net/url"
+       "strconv"
 
        "github.com/apache/yunikorn-core/pkg/webservice/dao"
 )
@@ -31,7 +32,8 @@ type RClient struct {
        BaseURL *url.URL
 }
 
-func (c *RClient) GetEvents() (*dao.EventRecordDAO, error) {
+// GetBatchEvents returns history events from the batch interface
+func (c *RClient) GetBatchEvents() (*dao.EventRecordDAO, error) {
        req, err := c.newRequest("GET", "ws/v1/events/batch")
        if err != nil {
                return nil, err
@@ -41,6 +43,21 @@ func (c *RClient) GetEvents() (*dao.EventRecordDAO, error) {
        return events, err
 }
 
+// GetEventsStream returns a persistent connection with a stream of events
+func (c *RClient) GetEventsStream(count uint64) (io.ReadCloser, error) {
+       req, err := c.newRequest("GET", "ws/v1/events/stream")
+       if err != nil {
+               return nil, err
+       }
+       req.URL.RawQuery = "count=" + strconv.FormatUint(count, 10)
+       resp, err := http.DefaultClient.Do(req)
+       if err != nil {
+               return nil, err
+       }
+
+       return resp.Body, nil
+}
+
 func (c *RClient) newRequest(method, path string) (*http.Request, error) {
        rel := &url.URL{Path: path}
        wsUrl := &url.URL{


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to