This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 770931fd [YUNIKORN-2454] Event streaming: send instanceUUID before the
events (#818)
770931fd is described below
commit 770931fdd6ae196c2b106521a0d1a341cca68186
Author: Peter Bacsko <[email protected]>
AuthorDate: Sat Mar 2 02:05:42 2024 +0800
[YUNIKORN-2454] Event streaming: send instanceUUID before the events (#818)
Closes: #818
Signed-off-by: Chia-Ping Tsai <[email protected]>
---
pkg/scheduler/tests/application_tracking_test.go | 39 +++++++++++----
pkg/webservice/dao/yk_uuid.go | 23 +++++++++
pkg/webservice/handlers.go | 10 +++-
pkg/webservice/handlers_test.go | 60 +++++++++++++++---------
4 files changed, 101 insertions(+), 31 deletions(-)
diff --git a/pkg/scheduler/tests/application_tracking_test.go
b/pkg/scheduler/tests/application_tracking_test.go
index b7f0ee12..6bfb76bf 100644
--- a/pkg/scheduler/tests/application_tracking_test.go
+++ b/pkg/scheduler/tests/application_tracking_test.go
@@ -24,12 +24,14 @@ import (
"errors"
"fmt"
"io"
+ "strings"
"testing"
"time"
"gotest.tools/v3/assert"
"github.com/apache/yunikorn-core/pkg/common"
+ "github.com/apache/yunikorn-core/pkg/webservice/dao"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -59,8 +61,9 @@ func TestApplicationHistoryTracking(t *testing.T) {
assert.NilError(t, err)
assert.Equal(t, 2, len(eventsDao.EventRecords), "number of events
generated")
verifyQueueEvents(t, eventsDao.EventRecords)
- events := getEventsFromStream(t, stream, 2)
+ events, uuid := getEventsFromStream(t, true, stream, 3)
assert.NilError(t, err)
+ assert.Assert(t, uuid != "")
verifyQueueEvents(t, events)
// Register a node & check events
@@ -86,7 +89,7 @@ func TestApplicationHistoryTracking(t *testing.T) {
assert.NilError(t, err)
assert.Equal(t, 5, len(eventsDao.EventRecords), "number of events
generated")
verifyNodeAddedAndQueueMaxSetEvents(t, eventsDao.EventRecords[2:])
- events = getEventsFromStream(t, stream, 3)
+ events, _ = getEventsFromStream(t, false, stream, 3)
assert.NilError(t, err)
verifyNodeAddedAndQueueMaxSetEvents(t, events)
@@ -101,7 +104,7 @@ func TestApplicationHistoryTracking(t *testing.T) {
assert.NilError(t, err)
assert.Equal(t, 7, len(eventsDao.EventRecords), "number of events
generated")
verifyAppAddedEvents(t, eventsDao.EventRecords[5:])
- events = getEventsFromStream(t, stream, 2)
+ events, _ = getEventsFromStream(t, false, stream, 2)
assert.NilError(t, err)
verifyAppAddedEvents(t, events)
@@ -128,7 +131,7 @@ func TestApplicationHistoryTracking(t *testing.T) {
assert.NilError(t, err)
assert.Equal(t, 13, len(eventsDao.EventRecords), "number of events
generated")
verifyAllocationAskAddedEvents(t, eventsDao.EventRecords[7:])
- events = getEventsFromStream(t, stream, 6)
+ events, _ = getEventsFromStream(t, false, stream, 6)
verifyAllocationAskAddedEvents(t, events)
allocations := ms.mockRM.getAllocations()
@@ -165,19 +168,31 @@ func TestApplicationHistoryTracking(t *testing.T) {
assert.NilError(t, err)
assert.Equal(t, 17, len(eventsDao.EventRecords), "number of events
generated")
verifyAllocationCancelledEvents(t, eventsDao.EventRecords[13:])
- events = getEventsFromStream(t, stream, 4)
+ events, _ = getEventsFromStream(t, false, stream, 4)
assert.NilError(t, err)
verifyAllocationCancelledEvents(t, events)
}
-func getEventsFromStream(t *testing.T, stream io.ReadCloser, numEvents int)
[]*si.EventRecord {
+func getEventsFromStream(t *testing.T, initial bool, stream io.ReadCloser,
numEvents int) ([]*si.EventRecord, string) {
lines, err := readLinesFromStream(stream, numEvents)
assert.NilError(t, err)
- events, err := siEventFromJson(lines)
- assert.NilError(t, err)
+ var events []*si.EventRecord
+ var uuid string
+ if initial {
+ uuid, err = getInstanceUUID(lines[0])
+ assert.NilError(t, err)
+ events, err = siEventFromJson(lines[1:])
+ assert.NilError(t, err)
+ } else {
+ events, err = siEventFromJson(lines)
+ assert.NilError(t, err)
+ for _, s := range lines {
+ assert.Assert(t, !strings.Contains(s, "InstanceUUID"),
"unexpected InstanceUUID")
+ }
+ }
- return events
+ return events, uuid
}
type scanResult struct {
@@ -229,6 +244,12 @@ func siEventFromJson(lines []string) ([]*si.EventRecord,
error) {
return events, nil
}
+func getInstanceUUID(output string) (string, error) {
+ var id dao.YunikornID
+ err := json.Unmarshal([]byte(output), &id)
+ return id.InstanceUUID, err
+}
+
func verifyQueueEvents(t *testing.T, events []*si.EventRecord) {
assert.Equal(t, "root", events[0].ObjectID)
assert.Equal(t, "", events[0].Message)
diff --git a/pkg/webservice/dao/yk_uuid.go b/pkg/webservice/dao/yk_uuid.go
new file mode 100644
index 00000000..ff9f5dfe
--- /dev/null
+++ b/pkg/webservice/dao/yk_uuid.go
@@ -0,0 +1,23 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package dao
+
+type YunikornID struct {
+ InstanceUUID string
+}
diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go
index a10bac53..ffea7844 100644
--- a/pkg/webservice/handlers.go
+++ b/pkg/webservice/handlers.go
@@ -1172,6 +1172,14 @@ func getStream(w http.ResponseWriter, r *http.Request) {
stream := eventSystem.CreateEventStream(r.Host, count)
defer eventSystem.RemoveStream(stream)
+ if err := enc.Encode(dao.YunikornID{
+ InstanceUUID: schedulerContext.GetUUID(),
+ }); err != nil {
+ buildJSONErrorResponse(w, err.Error(),
http.StatusInternalServerError)
+ return
+ }
+ f.Flush()
+
// Reading events in an infinite loop until either the client
disconnects or Yunikorn closes the channel.
// This results in a persistent HTTP connection where the message body
is never closed.
// Write deadline is adjusted before sending data to the client.
@@ -1186,7 +1194,7 @@ func getStream(w http.ResponseWriter, r *http.Request) {
if err != nil {
// should not fail at this point
log.Log(log.REST).Error("Cannot set write
deadline", zap.Error(err))
- buildJSONErrorResponse(w, fmt.Sprintf("Cannot
set write deadline: %v", err), http.StatusInternalServerError)
+ buildJSONErrorResponse(w, fmt.Sprintf("Cannot
set write deadline: %v", err), http.StatusOK) // status code is already 200 at
this point
return
}
diff --git a/pkg/webservice/handlers_test.go b/pkg/webservice/handlers_test.go
index 0442619a..6745ce16 100644
--- a/pkg/webservice/handlers_test.go
+++ b/pkg/webservice/handlers_test.go
@@ -23,7 +23,6 @@ import (
"encoding/json"
"errors"
"fmt"
- "io"
"net/http"
"net/http/httptest"
"reflect"
@@ -2034,6 +2033,7 @@ func TestGetEventsWhenTrackingDisabled(t *testing.T) {
}
func TestGetStream(t *testing.T) {
+ setup(t, configDefault, 1)
ev, req := initEventsAndCreateRequest(t)
defer ev.Stop()
cancelCtx, cancel := context.WithCancel(context.Background())
@@ -2066,9 +2066,10 @@ func TestGetStream(t *testing.T) {
assert.NilError(t, err, "cannot read response body")
lines := strings.Split(string(output[:n]), "\n")
- assertEvent(t, lines[0], 111, "app-1")
- assertEvent(t, lines[1], 222, "node-1")
- assertEvent(t, lines[2], 333, "app-2")
+ assertInstanceUUID(t, lines[0])
+ assertEvent(t, lines[1], 111, "app-1")
+ assertEvent(t, lines[2], 222, "node-1")
+ assertEvent(t, lines[3], 333, "app-2")
}
func TestGetStream_StreamClosedByProducer(t *testing.T) {
@@ -2093,8 +2094,9 @@ func TestGetStream_StreamClosedByProducer(t *testing.T) {
assert.Equal(t, http.StatusOK, resp.Code)
assert.NilError(t, err, "cannot read response body")
lines := strings.Split(string(output[:n]), "\n")
- assertEvent(t, lines[0], 111, "app-1")
- assertYunikornError(t, lines[1], "Event stream was closed by the
producer")
+ assertInstanceUUID(t, lines[0])
+ assertEvent(t, lines[1], 111, "app-1")
+ assertYunikornError(t, lines[2], "Event stream was closed by the
producer")
}
func TestGetStream_NotFlusherImpl(t *testing.T) {
@@ -2110,6 +2112,7 @@ func TestGetStream_NotFlusherImpl(t *testing.T) {
}
func TestGetStream_Count(t *testing.T) {
+ setup(t, configDefault, 1)
ev, req := initEventsAndCreateRequest(t)
defer ev.Stop()
cancelCtx, cancel := context.WithCancel(context.Background())
@@ -2131,8 +2134,8 @@ func TestGetStream_Count(t *testing.T) {
getStream(resp, req)
output := make([]byte, 256)
n, err := resp.Body.Read(output)
- assert.Error(t, io.EOF, err.Error())
- assert.Equal(t, 0, n)
+ lines := strings.Split(string(output[:n]), "\n")
+ assertInstanceUUID(t, lines[0])
// case #2: "count" is set to "2"
req, err = http.NewRequest("GET", "/ws/v1/events/stream",
strings.NewReader(""))
@@ -2149,9 +2152,10 @@ func TestGetStream_Count(t *testing.T) {
output = make([]byte, 256)
n, err = resp.Body.Read(output)
assert.NilError(t, err)
- lines := strings.Split(string(output[:n]), "\n")
- assertEvent(t, lines[0], 1, "")
- assertEvent(t, lines[1], 2, "")
+ lines = strings.Split(string(output[:n]), "\n")
+ assertInstanceUUID(t, lines[0])
+ assertEvent(t, lines[1], 1, "")
+ assertEvent(t, lines[2], 2, "")
// case #3: illegal value
req, err = http.NewRequest("GET", "/ws/v1/events/stream",
strings.NewReader(""))
@@ -2182,7 +2186,7 @@ func TestGetStream_TrackingDisabled(t *testing.T) {
_, req := initEventsAndCreateRequest(t)
resp := httptest.NewRecorder()
- assertGetStreamError(t, req, resp, http.StatusInternalServerError,
"Event tracking is disabled")
+ assertGetStreamError(t, false, req, resp,
http.StatusInternalServerError, "Event tracking is disabled")
}
func TestGetStream_NoWriteDeadline(t *testing.T) {
@@ -2190,10 +2194,11 @@ func TestGetStream_NoWriteDeadline(t *testing.T) {
defer ev.Stop()
resp := httptest.NewRecorder() // does not have SetWriteDeadline()
- assertGetStreamError(t, req, resp, http.StatusInternalServerError,
"Cannot set write deadline: feature not supported")
+ assertGetStreamError(t, false, req, resp,
http.StatusInternalServerError, "Cannot set write deadline: feature not
supported")
}
func TestGetStream_SetWriteDeadlineFails(t *testing.T) {
+ setup(t, configDefault, 1)
ev, req := initEventsAndCreateRequest(t)
defer ev.Stop()
resp := NewResponseRecorderWithDeadline()
@@ -2209,7 +2214,7 @@ func TestGetStream_SetWriteDeadlineFails(t *testing.T) {
}()
getStream(resp, req)
- checkGetStreamErrorResult(t, resp.Result(),
http.StatusInternalServerError, "Cannot set write deadline: SetWriteDeadline
failed")
+ checkGetStreamErrorResult(t, true, resp.Result(), http.StatusOK,
"Cannot set write deadline: SetWriteDeadline failed")
}
func TestGetStream_SetReadDeadlineFails(t *testing.T) {
@@ -2217,7 +2222,7 @@ func TestGetStream_SetReadDeadlineFails(t *testing.T) {
resp := NewResponseRecorderWithDeadline()
resp.setReadFails = true
- assertGetStreamError(t, req, resp, http.StatusInternalServerError,
"Cannot set read deadline: SetReadDeadline failed")
+ assertGetStreamError(t, false, req, resp,
http.StatusInternalServerError, "Cannot set read deadline: SetReadDeadline
failed")
}
func TestGetStream_Limit(t *testing.T) {
@@ -2249,10 +2254,10 @@ func TestGetStream_Limit(t *testing.T) {
return streamingLimiter.streams == 3
})
assert.NilError(t, err)
- assertGetStreamError(t, req, resp, http.StatusServiceUnavailable, "Too
many streaming connections")
+ assertGetStreamError(t, false, req, resp,
http.StatusServiceUnavailable, "Too many streaming connections")
}
-func assertGetStreamError(t *testing.T, req *http.Request, resp interface{},
statusCode int, expectedMsg string) {
+func assertGetStreamError(t *testing.T, withUUID bool, req *http.Request, resp
interface{}, statusCode int, expectedMsg string) {
t.Helper()
var response *http.Response
@@ -2267,16 +2272,22 @@ func assertGetStreamError(t *testing.T, req
*http.Request, resp interface{}, sta
t.Fatalf("unknown response recorder type")
}
- checkGetStreamErrorResult(t, response, statusCode, expectedMsg)
+ checkGetStreamErrorResult(t, withUUID, response, statusCode,
expectedMsg)
}
-func checkGetStreamErrorResult(t *testing.T, response *http.Response,
statusCode int, expectedMsg string) {
+func checkGetStreamErrorResult(t *testing.T, withUUID bool, response
*http.Response, statusCode int, expectedMsg string) {
t.Helper()
output := make([]byte, 256)
n, err := response.Body.Read(output)
assert.NilError(t, err)
- line := string(output[:n])
- assertYunikornError(t, line, expectedMsg)
+ if withUUID {
+ lines := strings.Split(string(output[:n]), "\n")
+ assertInstanceUUID(t, lines[0])
+ assertYunikornError(t, lines[1], expectedMsg)
+ } else {
+ line := string(output[:n])
+ assertYunikornError(t, line, expectedMsg)
+ }
assert.Equal(t, statusCode, response.StatusCode)
}
@@ -2302,6 +2313,13 @@ func assertEvent(t *testing.T, output string, tsNano
int64, objectID string) {
assert.Equal(t, objectID, evt.ObjectID)
}
+func assertInstanceUUID(t *testing.T, output string) {
+ var id dao.YunikornID
+ err := json.Unmarshal([]byte(output), &id)
+ assert.NilError(t, err)
+ assert.Assert(t, id.InstanceUUID != "")
+}
+
func assertYunikornError(t *testing.T, output, errMsg string) {
t.Helper()
var ykErr dao.YAPIError
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]