This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new a786feb5 [YUNIKORN-2515] Add property event.RESTResponseSize to the
batch event handler (#886)
a786feb5 is described below
commit a786feb5761be28e802d08976d224c40639cd86b
Author: Peter Bacsko <[email protected]>
AuthorDate: Wed Jun 12 19:18:03 2024 +0200
[YUNIKORN-2515] Add property event.RESTResponseSize to the batch event
handler (#886)
Closes: #886
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/common/configs/configs.go | 2 ++
pkg/webservice/handlers.go | 27 ++++++++++++++++++++++++---
pkg/webservice/handlers_test.go | 33 +++++++++++++++++++++++++++++++++
3 files changed, 59 insertions(+), 3 deletions(-)
diff --git a/pkg/common/configs/configs.go b/pkg/common/configs/configs.go
index 8d591afe..83e8ec8c 100644
--- a/pkg/common/configs/configs.go
+++ b/pkg/common/configs/configs.go
@@ -38,6 +38,7 @@ const (
CMEventRingBufferCapacity = PrefixEvent + "ringBufferCapacity" // Ring
Buffer Capacity
CMMaxEventStreams = PrefixEvent + "maxStreams"
CMMaxEventStreamsPerHost = PrefixEvent + "maxStreamsPerHost"
+ CMRESTResponseSize = PrefixEvent + "RESTResponseSize"
// defaults
DefaultHealthCheckInterval = 30 * time.Second
@@ -46,6 +47,7 @@ const (
DefaultEventRingBufferCapacity = 100000
DefaultMaxStreams = uint64(100)
DefaultMaxStreamsPerHost = uint64(15)
+ DefaultRESTResponseSize = uint64(10000)
)
var ConfigContext *SchedulerConfigContext
diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go
index 5e37c80f..6bb031b0 100644
--- a/pkg/webservice/handlers.go
+++ b/pkg/webservice/handlers.go
@@ -28,6 +28,7 @@ import (
"sort"
"strconv"
"strings"
+ "sync/atomic"
"time"
"github.com/julienschmidt/httprouter"
@@ -65,6 +66,7 @@ const (
var allowedActiveStatusMsg string
var allowedAppActiveStatuses map[string]bool
var streamingLimiter *StreamingLimiter
+var maxRESTResponseSize atomic.Uint64
func init() {
allowedAppActiveStatuses = make(map[string]bool)
@@ -83,6 +85,22 @@ func init() {
allowedActiveStatusMsg = fmt.Sprintf("Only following active statuses
are allowed: %s", strings.Join(activeStatuses, ","))
streamingLimiter = NewStreamingLimiter()
+
+ configs.AddConfigMapCallback("rest-response-size", func() {
+ newSize := common.GetConfigurationUint(configs.GetConfigMap(),
configs.CMRESTResponseSize, configs.DefaultRESTResponseSize)
+ if newSize == 0 {
+ log.Log(log.REST).Warn("Illegal value `0` for config
key, using default",
+ zap.String("key", configs.CMRESTResponseSize),
+ zap.Uint64("default",
configs.DefaultRESTResponseSize))
+ newSize = configs.DefaultRESTResponseSize
+ }
+
+ log.Log(log.REST).Info("Reloading max REST event response size
setting",
+ zap.Uint64("current", maxRESTResponseSize.Load()),
+ zap.Uint64("new", newSize))
+ maxRESTResponseSize.Store(newSize)
+ })
+ maxRESTResponseSize.Store(configs.DefaultRESTResponseSize)
}
func getStackInfo(w http.ResponseWriter, r *http.Request) {
@@ -1117,9 +1135,8 @@ func getEvents(w http.ResponseWriter, r *http.Request) {
return
}
- count := uint64(10000)
- var start uint64
-
+ maxCount := maxRESTResponseSize.Load()
+ count := maxCount
if countStr := r.URL.Query().Get("count"); countStr != "" {
var err error
count, err = strconv.ParseUint(countStr, 10, 64)
@@ -1127,12 +1144,16 @@ func getEvents(w http.ResponseWriter, r *http.Request) {
buildJSONErrorResponse(w, err.Error(),
http.StatusBadRequest)
return
}
+ if count > maxCount {
+ count = maxCount
+ }
if count == 0 {
buildJSONErrorResponse(w, `0 is not a valid value for
"count"`, http.StatusBadRequest)
return
}
}
+ var start uint64
if startStr := r.URL.Query().Get("start"); startStr != "" {
var err error
start, err = strconv.ParseUint(startStr, 10, 64)
diff --git a/pkg/webservice/handlers_test.go b/pkg/webservice/handlers_test.go
index dfbe729b..3e72843a 100644
--- a/pkg/webservice/handlers_test.go
+++ b/pkg/webservice/handlers_test.go
@@ -2064,6 +2064,11 @@ func TestGetEvents(t *testing.T) {
checkIllegalBatchRequest(t, "count=0", `0 is not a valid value for
"count`)
checkIllegalBatchRequest(t, "start=xyz", `strconv.ParseUint: parsing
"xyz": invalid syntax`)
checkIllegalBatchRequest(t, "start=-100", `strconv.ParseUint: parsing
"-100": invalid syntax`)
+
+ // "count" too high
+ maxRESTResponseSize.Store(1)
+ defer maxRESTResponseSize.Store(configs.DefaultRESTResponseSize)
+ checkSingleEvent(t, appEvent, "count=3")
}
func TestGetEventsWhenTrackingDisabled(t *testing.T) {
@@ -2681,6 +2686,34 @@ func TestGetPartitionRuleHandler(t *testing.T) {
assert.Equal(t, partitionRules[3].Name, types.Recovery)
}
+func TestSetMaxRESTResponseSize(t *testing.T) {
+ current := configs.GetConfigMap()
+ defer configs.SetConfigMap(current)
+
+ configs.SetConfigMap(map[string]string{
+ configs.CMRESTResponseSize: "1234",
+ })
+ assert.Equal(t, uint64(1234), maxRESTResponseSize.Load())
+
+ configs.SetConfigMap(map[string]string{})
+ assert.Equal(t, uint64(10000), maxRESTResponseSize.Load())
+
+ configs.SetConfigMap(map[string]string{
+ configs.CMRESTResponseSize: "xyz",
+ })
+ assert.Equal(t, uint64(10000), maxRESTResponseSize.Load())
+
+ configs.SetConfigMap(map[string]string{
+ configs.CMRESTResponseSize: "0",
+ })
+ assert.Equal(t, uint64(10000), maxRESTResponseSize.Load())
+
+ configs.SetConfigMap(map[string]string{
+ configs.CMRESTResponseSize: "-1",
+ })
+ assert.Equal(t, uint64(10000), maxRESTResponseSize.Load())
+}
+
type ResponseRecorderWithDeadline struct {
*httptest.ResponseRecorder
setWriteFails bool
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]