This is an automated email from the ASF dual-hosted git repository.
wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 47ca6fed [YUNIKORN-2073] add rate limit for sendAppDoesNotFitEvent
(#688)
47ca6fed is described below
commit 47ca6fed3164f425216d0b6bc2875bf177da4bae
Author: PoAn Yang <[email protected]>
AuthorDate: Thu Nov 2 16:57:25 2023 +1100
[YUNIKORN-2073] add rate limit for sendAppDoesNotFitEvent (#688)
Signed-off-by: PoAn Yang <[email protected]>
Closes: #688
Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
pkg/scheduler/objects/application_events.go | 7 ++++++-
pkg/scheduler/objects/application_events_test.go | 22 ++++++++++++++++++++++
2 files changed, 28 insertions(+), 1 deletion(-)
diff --git a/pkg/scheduler/objects/application_events.go
b/pkg/scheduler/objects/application_events.go
index 075e889e..56100290 100644
--- a/pkg/scheduler/objects/application_events.go
+++ b/pkg/scheduler/objects/application_events.go
@@ -20,6 +20,9 @@ package objects
import (
"fmt"
+ "time"
+
+ "golang.org/x/time/rate"
"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/events"
@@ -29,10 +32,11 @@ import (
type applicationEvents struct {
eventSystem events.EventSystem
app *Application
+ limiter *rate.Limiter
}
func (evt *applicationEvents) sendAppDoesNotFitEvent(request *AllocationAsk) {
- if !evt.eventSystem.IsEventTrackingEnabled() {
+ if !evt.eventSystem.IsEventTrackingEnabled() || !evt.limiter.Allow() {
return
}
message := fmt.Sprintf("Application %s does not fit into %s queue",
request.GetApplicationID(), evt.app.queuePath)
@@ -132,5 +136,6 @@ func newApplicationEvents(app *Application, evt
events.EventSystem) *application
return &applicationEvents{
eventSystem: evt,
app: app,
+ limiter: rate.NewLimiter(rate.Every(time.Second), 1),
}
}
diff --git a/pkg/scheduler/objects/application_events_test.go
b/pkg/scheduler/objects/application_events_test.go
index 7fe6b004..9d72a8bc 100644
--- a/pkg/scheduler/objects/application_events_test.go
+++ b/pkg/scheduler/objects/application_events_test.go
@@ -20,6 +20,7 @@ package objects
import (
"testing"
+ "time"
"gotest.tools/v3/assert"
@@ -66,6 +67,27 @@ func TestSendAppDoesNotFitEvent(t *testing.T) {
assert.Equal(t, 1, len(mock.events), "event was not generated")
}
+func TestSendAppDoesNotFitEventWithRateLimiter(t *testing.T) {
+ app := &Application{
+ queuePath: "root.test",
+ }
+ mock := newEventSystemMock()
+ appEvents := newApplicationEvents(app, mock)
+ startTime := time.Now()
+ for {
+ elapsed := time.Since(startTime)
+ if elapsed > 500*time.Millisecond {
+ break
+ }
+ appEvents.sendAppDoesNotFitEvent(&AllocationAsk{
+ applicationID: appID0,
+ allocationKey: aKey,
+ })
+ time.Sleep(10 * time.Millisecond)
+ }
+ assert.Equal(t, 1, len(mock.events), "event was not generated")
+}
+
func TestSendPlaceholderLargerEvent(t *testing.T) {
app := &Application{
queuePath: "root.test",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]