This is an automated email from the ASF dual-hosted git repository.

wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 47ca6fed [YUNIKORN-2073] add rate limit for sendAppDoesNotFitEvent 
(#688)
47ca6fed is described below

commit 47ca6fed3164f425216d0b6bc2875bf177da4bae
Author: PoAn Yang <[email protected]>
AuthorDate: Thu Nov 2 16:57:25 2023 +1100

    [YUNIKORN-2073] add rate limit for sendAppDoesNotFitEvent (#688)
    
    Signed-off-by: PoAn Yang <[email protected]>
    
    Closes: #688
    
    Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
 pkg/scheduler/objects/application_events.go      |  7 ++++++-
 pkg/scheduler/objects/application_events_test.go | 22 ++++++++++++++++++++++
 2 files changed, 28 insertions(+), 1 deletion(-)

diff --git a/pkg/scheduler/objects/application_events.go 
b/pkg/scheduler/objects/application_events.go
index 075e889e..56100290 100644
--- a/pkg/scheduler/objects/application_events.go
+++ b/pkg/scheduler/objects/application_events.go
@@ -20,6 +20,9 @@ package objects
 
 import (
        "fmt"
+       "time"
+
+       "golang.org/x/time/rate"
 
        "github.com/apache/yunikorn-core/pkg/common"
        "github.com/apache/yunikorn-core/pkg/events"
@@ -29,10 +32,11 @@ import (
 type applicationEvents struct {
        eventSystem events.EventSystem
        app         *Application
+       limiter     *rate.Limiter
 }
 
 func (evt *applicationEvents) sendAppDoesNotFitEvent(request *AllocationAsk) {
-       if !evt.eventSystem.IsEventTrackingEnabled() {
+       if !evt.eventSystem.IsEventTrackingEnabled() || !evt.limiter.Allow() {
                return
        }
        message := fmt.Sprintf("Application %s does not fit into %s queue", 
request.GetApplicationID(), evt.app.queuePath)
@@ -132,5 +136,6 @@ func newApplicationEvents(app *Application, evt 
events.EventSystem) *application
        return &applicationEvents{
                eventSystem: evt,
                app:         app,
+               limiter:     rate.NewLimiter(rate.Every(time.Second), 1),
        }
 }
diff --git a/pkg/scheduler/objects/application_events_test.go 
b/pkg/scheduler/objects/application_events_test.go
index 7fe6b004..9d72a8bc 100644
--- a/pkg/scheduler/objects/application_events_test.go
+++ b/pkg/scheduler/objects/application_events_test.go
@@ -20,6 +20,7 @@ package objects
 
 import (
        "testing"
+       "time"
 
        "gotest.tools/v3/assert"
 
@@ -66,6 +67,27 @@ func TestSendAppDoesNotFitEvent(t *testing.T) {
        assert.Equal(t, 1, len(mock.events), "event was not generated")
 }
 
+func TestSendAppDoesNotFitEventWithRateLimiter(t *testing.T) {
+       app := &Application{
+               queuePath: "root.test",
+       }
+       mock := newEventSystemMock()
+       appEvents := newApplicationEvents(app, mock)
+       startTime := time.Now()
+       for {
+               elapsed := time.Since(startTime)
+               if elapsed > 500*time.Millisecond {
+                       break
+               }
+               appEvents.sendAppDoesNotFitEvent(&AllocationAsk{
+                       applicationID: appID0,
+                       allocationKey: aKey,
+               })
+               time.Sleep(10 * time.Millisecond)
+       }
+       assert.Equal(t, 1, len(mock.events), "event was not generated")
+}
+
 func TestSendPlaceholderLargerEvent(t *testing.T) {
        app := &Application{
                queuePath: "root.test",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to