This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 770931fd [YUNIKORN-2454] Event streaming: send instanceUUID before the 
events (#818)
770931fd is described below

commit 770931fdd6ae196c2b106521a0d1a341cca68186
Author: Peter Bacsko <[email protected]>
AuthorDate: Sat Mar 2 02:05:42 2024 +0800

    [YUNIKORN-2454] Event streaming: send instanceUUID before the events (#818)
    
    Closes: #818
    
    Signed-off-by: Chia-Ping Tsai <[email protected]>
---
 pkg/scheduler/tests/application_tracking_test.go | 39 +++++++++++----
 pkg/webservice/dao/yk_uuid.go                    | 23 +++++++++
 pkg/webservice/handlers.go                       | 10 +++-
 pkg/webservice/handlers_test.go                  | 60 +++++++++++++++---------
 4 files changed, 101 insertions(+), 31 deletions(-)

diff --git a/pkg/scheduler/tests/application_tracking_test.go 
b/pkg/scheduler/tests/application_tracking_test.go
index b7f0ee12..6bfb76bf 100644
--- a/pkg/scheduler/tests/application_tracking_test.go
+++ b/pkg/scheduler/tests/application_tracking_test.go
@@ -24,12 +24,14 @@ import (
        "errors"
        "fmt"
        "io"
+       "strings"
        "testing"
        "time"
 
        "gotest.tools/v3/assert"
 
        "github.com/apache/yunikorn-core/pkg/common"
+       "github.com/apache/yunikorn-core/pkg/webservice/dao"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
 
@@ -59,8 +61,9 @@ func TestApplicationHistoryTracking(t *testing.T) {
        assert.NilError(t, err)
        assert.Equal(t, 2, len(eventsDao.EventRecords), "number of events 
generated")
        verifyQueueEvents(t, eventsDao.EventRecords)
-       events := getEventsFromStream(t, stream, 2)
+       events, uuid := getEventsFromStream(t, true, stream, 3)
        assert.NilError(t, err)
+       assert.Assert(t, uuid != "")
        verifyQueueEvents(t, events)
 
        // Register a node & check events
@@ -86,7 +89,7 @@ func TestApplicationHistoryTracking(t *testing.T) {
        assert.NilError(t, err)
        assert.Equal(t, 5, len(eventsDao.EventRecords), "number of events 
generated")
        verifyNodeAddedAndQueueMaxSetEvents(t, eventsDao.EventRecords[2:])
-       events = getEventsFromStream(t, stream, 3)
+       events, _ = getEventsFromStream(t, false, stream, 3)
        assert.NilError(t, err)
        verifyNodeAddedAndQueueMaxSetEvents(t, events)
 
@@ -101,7 +104,7 @@ func TestApplicationHistoryTracking(t *testing.T) {
        assert.NilError(t, err)
        assert.Equal(t, 7, len(eventsDao.EventRecords), "number of events 
generated")
        verifyAppAddedEvents(t, eventsDao.EventRecords[5:])
-       events = getEventsFromStream(t, stream, 2)
+       events, _ = getEventsFromStream(t, false, stream, 2)
        assert.NilError(t, err)
        verifyAppAddedEvents(t, events)
 
@@ -128,7 +131,7 @@ func TestApplicationHistoryTracking(t *testing.T) {
        assert.NilError(t, err)
        assert.Equal(t, 13, len(eventsDao.EventRecords), "number of events 
generated")
        verifyAllocationAskAddedEvents(t, eventsDao.EventRecords[7:])
-       events = getEventsFromStream(t, stream, 6)
+       events, _ = getEventsFromStream(t, false, stream, 6)
        verifyAllocationAskAddedEvents(t, events)
 
        allocations := ms.mockRM.getAllocations()
@@ -165,19 +168,31 @@ func TestApplicationHistoryTracking(t *testing.T) {
        assert.NilError(t, err)
        assert.Equal(t, 17, len(eventsDao.EventRecords), "number of events 
generated")
        verifyAllocationCancelledEvents(t, eventsDao.EventRecords[13:])
-       events = getEventsFromStream(t, stream, 4)
+       events, _ = getEventsFromStream(t, false, stream, 4)
        assert.NilError(t, err)
        verifyAllocationCancelledEvents(t, events)
 }
 
-func getEventsFromStream(t *testing.T, stream io.ReadCloser, numEvents int) 
[]*si.EventRecord {
+func getEventsFromStream(t *testing.T, initial bool, stream io.ReadCloser, 
numEvents int) ([]*si.EventRecord, string) {
        lines, err := readLinesFromStream(stream, numEvents)
        assert.NilError(t, err)
 
-       events, err := siEventFromJson(lines)
-       assert.NilError(t, err)
+       var events []*si.EventRecord
+       var uuid string
+       if initial {
+               uuid, err = getInstanceUUID(lines[0])
+               assert.NilError(t, err)
+               events, err = siEventFromJson(lines[1:])
+               assert.NilError(t, err)
+       } else {
+               events, err = siEventFromJson(lines)
+               assert.NilError(t, err)
+               for _, s := range lines {
+                       assert.Assert(t, !strings.Contains(s, "InstanceUUID"), 
"unexpected InstanceUUID")
+               }
+       }
 
-       return events
+       return events, uuid
 }
 
 type scanResult struct {
@@ -229,6 +244,12 @@ func siEventFromJson(lines []string) ([]*si.EventRecord, 
error) {
        return events, nil
 }
 
+func getInstanceUUID(output string) (string, error) {
+       var id dao.YunikornID
+       err := json.Unmarshal([]byte(output), &id)
+       return id.InstanceUUID, err
+}
+
 func verifyQueueEvents(t *testing.T, events []*si.EventRecord) {
        assert.Equal(t, "root", events[0].ObjectID)
        assert.Equal(t, "", events[0].Message)
diff --git a/pkg/webservice/dao/yk_uuid.go b/pkg/webservice/dao/yk_uuid.go
new file mode 100644
index 00000000..ff9f5dfe
--- /dev/null
+++ b/pkg/webservice/dao/yk_uuid.go
@@ -0,0 +1,23 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package dao
+
+type YunikornID struct {
+       InstanceUUID string
+}
diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go
index a10bac53..ffea7844 100644
--- a/pkg/webservice/handlers.go
+++ b/pkg/webservice/handlers.go
@@ -1172,6 +1172,14 @@ func getStream(w http.ResponseWriter, r *http.Request) {
        stream := eventSystem.CreateEventStream(r.Host, count)
        defer eventSystem.RemoveStream(stream)
 
+       if err := enc.Encode(dao.YunikornID{
+               InstanceUUID: schedulerContext.GetUUID(),
+       }); err != nil {
+               buildJSONErrorResponse(w, err.Error(), 
http.StatusInternalServerError)
+               return
+       }
+       f.Flush()
+
        // Reading events in an infinite loop until either the client 
disconnects or Yunikorn closes the channel.
        // This results in a persistent HTTP connection where the message body 
is never closed.
        // Write deadline is adjusted before sending data to the client.
@@ -1186,7 +1194,7 @@ func getStream(w http.ResponseWriter, r *http.Request) {
                        if err != nil {
                                // should not fail at this point
                                log.Log(log.REST).Error("Cannot set write 
deadline", zap.Error(err))
-                               buildJSONErrorResponse(w, fmt.Sprintf("Cannot 
set write deadline: %v", err), http.StatusInternalServerError)
+                               buildJSONErrorResponse(w, fmt.Sprintf("Cannot 
set write deadline: %v", err), http.StatusOK) // status code is already 200 at 
this point
                                return
                        }
 
diff --git a/pkg/webservice/handlers_test.go b/pkg/webservice/handlers_test.go
index 0442619a..6745ce16 100644
--- a/pkg/webservice/handlers_test.go
+++ b/pkg/webservice/handlers_test.go
@@ -23,7 +23,6 @@ import (
        "encoding/json"
        "errors"
        "fmt"
-       "io"
        "net/http"
        "net/http/httptest"
        "reflect"
@@ -2034,6 +2033,7 @@ func TestGetEventsWhenTrackingDisabled(t *testing.T) {
 }
 
 func TestGetStream(t *testing.T) {
+       setup(t, configDefault, 1)
        ev, req := initEventsAndCreateRequest(t)
        defer ev.Stop()
        cancelCtx, cancel := context.WithCancel(context.Background())
@@ -2066,9 +2066,10 @@ func TestGetStream(t *testing.T) {
        assert.NilError(t, err, "cannot read response body")
 
        lines := strings.Split(string(output[:n]), "\n")
-       assertEvent(t, lines[0], 111, "app-1")
-       assertEvent(t, lines[1], 222, "node-1")
-       assertEvent(t, lines[2], 333, "app-2")
+       assertInstanceUUID(t, lines[0])
+       assertEvent(t, lines[1], 111, "app-1")
+       assertEvent(t, lines[2], 222, "node-1")
+       assertEvent(t, lines[3], 333, "app-2")
 }
 
 func TestGetStream_StreamClosedByProducer(t *testing.T) {
@@ -2093,8 +2094,9 @@ func TestGetStream_StreamClosedByProducer(t *testing.T) {
        assert.Equal(t, http.StatusOK, resp.Code)
        assert.NilError(t, err, "cannot read response body")
        lines := strings.Split(string(output[:n]), "\n")
-       assertEvent(t, lines[0], 111, "app-1")
-       assertYunikornError(t, lines[1], "Event stream was closed by the 
producer")
+       assertInstanceUUID(t, lines[0])
+       assertEvent(t, lines[1], 111, "app-1")
+       assertYunikornError(t, lines[2], "Event stream was closed by the 
producer")
 }
 
 func TestGetStream_NotFlusherImpl(t *testing.T) {
@@ -2110,6 +2112,7 @@ func TestGetStream_NotFlusherImpl(t *testing.T) {
 }
 
 func TestGetStream_Count(t *testing.T) {
+       setup(t, configDefault, 1)
        ev, req := initEventsAndCreateRequest(t)
        defer ev.Stop()
        cancelCtx, cancel := context.WithCancel(context.Background())
@@ -2131,8 +2134,8 @@ func TestGetStream_Count(t *testing.T) {
        getStream(resp, req)
        output := make([]byte, 256)
        n, err := resp.Body.Read(output)
-       assert.Error(t, io.EOF, err.Error())
-       assert.Equal(t, 0, n)
+       lines := strings.Split(string(output[:n]), "\n")
+       assertInstanceUUID(t, lines[0])
 
        // case #2: "count" is set to "2"
        req, err = http.NewRequest("GET", "/ws/v1/events/stream", 
strings.NewReader(""))
@@ -2149,9 +2152,10 @@ func TestGetStream_Count(t *testing.T) {
        output = make([]byte, 256)
        n, err = resp.Body.Read(output)
        assert.NilError(t, err)
-       lines := strings.Split(string(output[:n]), "\n")
-       assertEvent(t, lines[0], 1, "")
-       assertEvent(t, lines[1], 2, "")
+       lines = strings.Split(string(output[:n]), "\n")
+       assertInstanceUUID(t, lines[0])
+       assertEvent(t, lines[1], 1, "")
+       assertEvent(t, lines[2], 2, "")
 
        // case #3: illegal value
        req, err = http.NewRequest("GET", "/ws/v1/events/stream", 
strings.NewReader(""))
@@ -2182,7 +2186,7 @@ func TestGetStream_TrackingDisabled(t *testing.T) {
        _, req := initEventsAndCreateRequest(t)
        resp := httptest.NewRecorder()
 
-       assertGetStreamError(t, req, resp, http.StatusInternalServerError, 
"Event tracking is disabled")
+       assertGetStreamError(t, false, req, resp, 
http.StatusInternalServerError, "Event tracking is disabled")
 }
 
 func TestGetStream_NoWriteDeadline(t *testing.T) {
@@ -2190,10 +2194,11 @@ func TestGetStream_NoWriteDeadline(t *testing.T) {
        defer ev.Stop()
        resp := httptest.NewRecorder() // does not have SetWriteDeadline()
 
-       assertGetStreamError(t, req, resp, http.StatusInternalServerError, 
"Cannot set write deadline: feature not supported")
+       assertGetStreamError(t, false, req, resp, 
http.StatusInternalServerError, "Cannot set write deadline: feature not 
supported")
 }
 
 func TestGetStream_SetWriteDeadlineFails(t *testing.T) {
+       setup(t, configDefault, 1)
        ev, req := initEventsAndCreateRequest(t)
        defer ev.Stop()
        resp := NewResponseRecorderWithDeadline()
@@ -2209,7 +2214,7 @@ func TestGetStream_SetWriteDeadlineFails(t *testing.T) {
        }()
 
        getStream(resp, req)
-       checkGetStreamErrorResult(t, resp.Result(), 
http.StatusInternalServerError, "Cannot set write deadline: SetWriteDeadline 
failed")
+       checkGetStreamErrorResult(t, true, resp.Result(), http.StatusOK, 
"Cannot set write deadline: SetWriteDeadline failed")
 }
 
 func TestGetStream_SetReadDeadlineFails(t *testing.T) {
@@ -2217,7 +2222,7 @@ func TestGetStream_SetReadDeadlineFails(t *testing.T) {
        resp := NewResponseRecorderWithDeadline()
        resp.setReadFails = true
 
-       assertGetStreamError(t, req, resp, http.StatusInternalServerError, 
"Cannot set read deadline: SetReadDeadline failed")
+       assertGetStreamError(t, false, req, resp, 
http.StatusInternalServerError, "Cannot set read deadline: SetReadDeadline 
failed")
 }
 
 func TestGetStream_Limit(t *testing.T) {
@@ -2249,10 +2254,10 @@ func TestGetStream_Limit(t *testing.T) {
                return streamingLimiter.streams == 3
        })
        assert.NilError(t, err)
-       assertGetStreamError(t, req, resp, http.StatusServiceUnavailable, "Too 
many streaming connections")
+       assertGetStreamError(t, false, req, resp, 
http.StatusServiceUnavailable, "Too many streaming connections")
 }
 
-func assertGetStreamError(t *testing.T, req *http.Request, resp interface{}, 
statusCode int, expectedMsg string) {
+func assertGetStreamError(t *testing.T, withUUID bool, req *http.Request, resp 
interface{}, statusCode int, expectedMsg string) {
        t.Helper()
        var response *http.Response
 
@@ -2267,16 +2272,22 @@ func assertGetStreamError(t *testing.T, req 
*http.Request, resp interface{}, sta
                t.Fatalf("unknown response recorder type")
        }
 
-       checkGetStreamErrorResult(t, response, statusCode, expectedMsg)
+       checkGetStreamErrorResult(t, withUUID, response, statusCode, 
expectedMsg)
 }
 
-func checkGetStreamErrorResult(t *testing.T, response *http.Response, 
statusCode int, expectedMsg string) {
+func checkGetStreamErrorResult(t *testing.T, withUUID bool, response 
*http.Response, statusCode int, expectedMsg string) {
        t.Helper()
        output := make([]byte, 256)
        n, err := response.Body.Read(output)
        assert.NilError(t, err)
-       line := string(output[:n])
-       assertYunikornError(t, line, expectedMsg)
+       if withUUID {
+               lines := strings.Split(string(output[:n]), "\n")
+               assertInstanceUUID(t, lines[0])
+               assertYunikornError(t, lines[1], expectedMsg)
+       } else {
+               line := string(output[:n])
+               assertYunikornError(t, line, expectedMsg)
+       }
        assert.Equal(t, statusCode, response.StatusCode)
 }
 
@@ -2302,6 +2313,13 @@ func assertEvent(t *testing.T, output string, tsNano 
int64, objectID string) {
        assert.Equal(t, objectID, evt.ObjectID)
 }
 
+func assertInstanceUUID(t *testing.T, output string) {
+       var id dao.YunikornID
+       err := json.Unmarshal([]byte(output), &id)
+       assert.NilError(t, err)
+       assert.Assert(t, id.InstanceUUID != "")
+}
+
 func assertYunikornError(t *testing.T, output, errMsg string) {
        t.Helper()
        var ykErr dao.YAPIError


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to