This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new a786feb5 [YUNIKORN-2515] Add property event.RESTResponseSize to the 
batch event handler (#886)
a786feb5 is described below

commit a786feb5761be28e802d08976d224c40639cd86b
Author: Peter Bacsko <[email protected]>
AuthorDate: Wed Jun 12 19:18:03 2024 +0200

    [YUNIKORN-2515] Add property event.RESTResponseSize to the batch event 
handler (#886)
    
    Closes: #886
    
    Signed-off-by: Peter Bacsko <[email protected]>
---
 pkg/common/configs/configs.go   |  2 ++
 pkg/webservice/handlers.go      | 27 ++++++++++++++++++++++++---
 pkg/webservice/handlers_test.go | 33 +++++++++++++++++++++++++++++++++
 3 files changed, 59 insertions(+), 3 deletions(-)

diff --git a/pkg/common/configs/configs.go b/pkg/common/configs/configs.go
index 8d591afe..83e8ec8c 100644
--- a/pkg/common/configs/configs.go
+++ b/pkg/common/configs/configs.go
@@ -38,6 +38,7 @@ const (
        CMEventRingBufferCapacity = PrefixEvent + "ringBufferCapacity" // Ring 
Buffer Capacity
        CMMaxEventStreams         = PrefixEvent + "maxStreams"
        CMMaxEventStreamsPerHost  = PrefixEvent + "maxStreamsPerHost"
+       CMRESTResponseSize        = PrefixEvent + "RESTResponseSize"
 
        // defaults
        DefaultHealthCheckInterval     = 30 * time.Second
@@ -46,6 +47,7 @@ const (
        DefaultEventRingBufferCapacity = 100000
        DefaultMaxStreams              = uint64(100)
        DefaultMaxStreamsPerHost       = uint64(15)
+       DefaultRESTResponseSize        = uint64(10000)
 )
 
 var ConfigContext *SchedulerConfigContext
diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go
index 5e37c80f..6bb031b0 100644
--- a/pkg/webservice/handlers.go
+++ b/pkg/webservice/handlers.go
@@ -28,6 +28,7 @@ import (
        "sort"
        "strconv"
        "strings"
+       "sync/atomic"
        "time"
 
        "github.com/julienschmidt/httprouter"
@@ -65,6 +66,7 @@ const (
 var allowedActiveStatusMsg string
 var allowedAppActiveStatuses map[string]bool
 var streamingLimiter *StreamingLimiter
+var maxRESTResponseSize atomic.Uint64
 
 func init() {
        allowedAppActiveStatuses = make(map[string]bool)
@@ -83,6 +85,22 @@ func init() {
        allowedActiveStatusMsg = fmt.Sprintf("Only following active statuses 
are allowed: %s", strings.Join(activeStatuses, ","))
 
        streamingLimiter = NewStreamingLimiter()
+
+       configs.AddConfigMapCallback("rest-response-size", func() {
+               newSize := common.GetConfigurationUint(configs.GetConfigMap(), 
configs.CMRESTResponseSize, configs.DefaultRESTResponseSize)
+               if newSize == 0 {
+                       log.Log(log.REST).Warn("Illegal value `0` for config 
key, using default",
+                               zap.String("key", configs.CMRESTResponseSize),
+                               zap.Uint64("default", 
configs.DefaultRESTResponseSize))
+                       newSize = configs.DefaultRESTResponseSize
+               }
+
+               log.Log(log.REST).Info("Reloading max REST event response size 
setting",
+                       zap.Uint64("current", maxRESTResponseSize.Load()),
+                       zap.Uint64("new", newSize))
+               maxRESTResponseSize.Store(newSize)
+       })
+       maxRESTResponseSize.Store(configs.DefaultRESTResponseSize)
 }
 
 func getStackInfo(w http.ResponseWriter, r *http.Request) {
@@ -1117,9 +1135,8 @@ func getEvents(w http.ResponseWriter, r *http.Request) {
                return
        }
 
-       count := uint64(10000)
-       var start uint64
-
+       maxCount := maxRESTResponseSize.Load()
+       count := maxCount
        if countStr := r.URL.Query().Get("count"); countStr != "" {
                var err error
                count, err = strconv.ParseUint(countStr, 10, 64)
@@ -1127,12 +1144,16 @@ func getEvents(w http.ResponseWriter, r *http.Request) {
                        buildJSONErrorResponse(w, err.Error(), 
http.StatusBadRequest)
                        return
                }
+               if count > maxCount {
+                       count = maxCount
+               }
                if count == 0 {
                        buildJSONErrorResponse(w, `0 is not a valid value for 
"count"`, http.StatusBadRequest)
                        return
                }
        }
 
+       var start uint64
        if startStr := r.URL.Query().Get("start"); startStr != "" {
                var err error
                start, err = strconv.ParseUint(startStr, 10, 64)
diff --git a/pkg/webservice/handlers_test.go b/pkg/webservice/handlers_test.go
index dfbe729b..3e72843a 100644
--- a/pkg/webservice/handlers_test.go
+++ b/pkg/webservice/handlers_test.go
@@ -2064,6 +2064,11 @@ func TestGetEvents(t *testing.T) {
        checkIllegalBatchRequest(t, "count=0", `0 is not a valid value for 
"count`)
        checkIllegalBatchRequest(t, "start=xyz", `strconv.ParseUint: parsing 
"xyz": invalid syntax`)
        checkIllegalBatchRequest(t, "start=-100", `strconv.ParseUint: parsing 
"-100": invalid syntax`)
+
+       // "count" too high
+       maxRESTResponseSize.Store(1)
+       defer maxRESTResponseSize.Store(configs.DefaultRESTResponseSize)
+       checkSingleEvent(t, appEvent, "count=3")
 }
 
 func TestGetEventsWhenTrackingDisabled(t *testing.T) {
@@ -2681,6 +2686,34 @@ func TestGetPartitionRuleHandler(t *testing.T) {
        assert.Equal(t, partitionRules[3].Name, types.Recovery)
 }
 
+func TestSetMaxRESTResponseSize(t *testing.T) {
+       current := configs.GetConfigMap()
+       defer configs.SetConfigMap(current)
+
+       configs.SetConfigMap(map[string]string{
+               configs.CMRESTResponseSize: "1234",
+       })
+       assert.Equal(t, uint64(1234), maxRESTResponseSize.Load())
+
+       configs.SetConfigMap(map[string]string{})
+       assert.Equal(t, uint64(10000), maxRESTResponseSize.Load())
+
+       configs.SetConfigMap(map[string]string{
+               configs.CMRESTResponseSize: "xyz",
+       })
+       assert.Equal(t, uint64(10000), maxRESTResponseSize.Load())
+
+       configs.SetConfigMap(map[string]string{
+               configs.CMRESTResponseSize: "0",
+       })
+       assert.Equal(t, uint64(10000), maxRESTResponseSize.Load())
+
+       configs.SetConfigMap(map[string]string{
+               configs.CMRESTResponseSize: "-1",
+       })
+       assert.Equal(t, uint64(10000), maxRESTResponseSize.Load())
+}
+
 type ResponseRecorderWithDeadline struct {
        *httptest.ResponseRecorder
        setWriteFails   bool


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to