This is an automated email from the ASF dual-hosted git repository.
wilfred-s pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new fa00b094 [YUNIKORN-2576] Fix data race in dispatcher test (#1034)
fa00b094 is described below
commit fa00b09407c3040bf46f6578ee75f6400c50768c
Author: sidbroski <[email protected]>
AuthorDate: Fri Jun 5 09:48:29 2026 +1000
[YUNIKORN-2576] Fix data race in dispatcher test (#1034)
Moved async dispatch settings and asyncDispatchCount from package variables
into the Dispatcher struct, and have createDispatcher() stop and drain in
flight async goroutines before reinitialisation. This prevents tests from
racing on shared state.
Verified 'go test ./pkg/dispatcher/... -race -count=10 > shim-race.txt'.
Closes: #1034
Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
pkg/dispatcher/dispatch_test.go | 63 ++++++++++++++++++++++++++++-------------
pkg/dispatcher/dispatcher.go | 40 ++++++++++++++------------
2 files changed, 65 insertions(+), 38 deletions(-)
diff --git a/pkg/dispatcher/dispatch_test.go b/pkg/dispatcher/dispatch_test.go
index 50aec6ae..163c8cf8 100644
--- a/pkg/dispatcher/dispatch_test.go
+++ b/pkg/dispatcher/dispatch_test.go
@@ -51,11 +51,15 @@ func (t TestAppEvent) GetArgs() []interface{} {
return nil
}
-const RunApplication string = "RunApplication"
+const (
+ RunApplication = "RunApplication"
+ maxTestAsyncDispatchDrain = 15 * time.Second
+ testAsyncDispatchPollPeriod = 100 * time.Millisecond
+)
func TestRegisterEventHandler(t *testing.T) {
- createDispatcher()
- defer createDispatcher()
+ createDispatcher(t)
+ defer createDispatcher(t)
RegisterEventHandler("TestAppHandler", EventTypeApp, func(obj
interface{}) {})
RegisterEventHandler("TestTaskHandler", EventTypeTask, func(obj
interface{}) {})
@@ -103,8 +107,8 @@ func (a *appEventsRecorder) size() int {
}
func TestDispatcherStartStop(t *testing.T) {
- createDispatcher()
- defer createDispatcher()
+ createDispatcher(t)
+ defer createDispatcher(t)
// thread safe
recorder := &appEventsRecorder{
apps: make([]string, 0),
@@ -157,8 +161,8 @@ func TestDispatcherStartStop(t *testing.T) {
// Test sending events from multiple senders in parallel,
// verify that events won't be lost
func TestEventWillNotBeLostWhenEventChannelIsFull(t *testing.T) {
- createDispatcher()
- defer createDispatcher()
+ createDispatcher(t)
+ defer createDispatcher(t)
dispatcher.eventChan = make(chan events.SchedulingEvent, 1)
// thread safe
@@ -187,7 +191,7 @@ func TestEventWillNotBeLostWhenEventChannelIsFull(t
*testing.T) {
}
// check event channel is full and some events are dispatched
asynchronously
- assert.Assert(t, asyncDispatchCount.Load() > 0)
+ assert.Assert(t, dispatcher.asyncDispatchCount.Load() > 0)
// wait until all events are handled
dispatcher.drain()
@@ -197,7 +201,7 @@ func TestEventWillNotBeLostWhenEventChannelIsFull(t
*testing.T) {
// assert all event are handled
assert.Equal(t, recorder.size(), numEvents)
- assert.Assert(t, asyncDispatchCount.Load() == 0)
+ assert.Assert(t, dispatcher.asyncDispatchCount.Load() == 0)
// ensure state is stopped
assert.Equal(t, dispatcher.isRunning(), false)
@@ -206,12 +210,12 @@ func TestEventWillNotBeLostWhenEventChannelIsFull(t
*testing.T) {
// Test dispatch timeout, verify that Dispatcher#asyncDispatch is called when
event channel is full
// and will disappear after timeout.
func TestDispatchTimeout(t *testing.T) {
- createDispatcher()
- defer createDispatcher()
+ createDispatcher(t)
+ defer createDispatcher(t)
// reset event channel with small capacity for testing
dispatcher.eventChan = make(chan events.SchedulingEvent, 1)
- AsyncDispatchCheckInterval = 100 * time.Millisecond
- DispatchTimeout = 500 * time.Millisecond
+ dispatcher.asyncDispatchCheckInterval = 100 * time.Millisecond
+ dispatcher.dispatchTimeout = 500 * time.Millisecond
// start the handler, but waiting on a flag
RegisterEventHandler("TestAppHandler", EventTypeApp, func(obj
interface{}) {
@@ -240,7 +244,7 @@ func TestDispatchTimeout(t *testing.T) {
// 2nd one should be added to the channel
// 3rd one should be posted as an async request
time.Sleep(100 * time.Millisecond)
- assert.Equal(t, asyncDispatchCount.Load(), int32(1))
+ assert.Equal(t, dispatcher.asyncDispatchCount.Load(), int32(1))
// verify Dispatcher#asyncDispatch is called
buf := make([]byte, 1<<16)
@@ -249,8 +253,8 @@ func TestDispatchTimeout(t *testing.T) {
// wait until async dispatch routine times out
err := utils.WaitForCondition(func() bool {
- return asyncDispatchCount.Load() == int32(0)
- }, 100*time.Millisecond, DispatchTimeout+AsyncDispatchCheckInterval)
+ return dispatcher.asyncDispatchCount.Load() == 0
+ }, testAsyncDispatchPollPeriod,
dispatcher.dispatchTimeout+dispatcher.asyncDispatchCheckInterval)
assert.NilError(t, err)
// verify no left-over thread
@@ -265,12 +269,12 @@ func TestDispatchTimeout(t *testing.T) {
// Test exceeding the async-dispatch limit, should panic immediately.
func TestExceedAsyncDispatchLimit(t *testing.T) {
- createDispatcher()
- defer createDispatcher()
+ createDispatcher(t)
+ defer createDispatcher(t)
// reset event channel with small capacity for testing
dispatcher.eventChan = make(chan events.SchedulingEvent, 1)
- AsyncDispatchLimit = 1
+ dispatcher.asyncDispatchLimit = 1
// pretend to be an time-consuming event-handler
RegisterEventHandler("TestAppHandler", EventTypeApp, func(obj
interface{}) {
if _, ok := obj.(events.ApplicationEvent); ok {
@@ -301,7 +305,26 @@ func TestExceedAsyncDispatchLimit(t *testing.T) {
}
}
-func createDispatcher() {
+// createDispatcher resets the dispatcher for a clean test state. It first
drains
+// any async dispatch goroutines left by a previous test before reinitialising,
+// failing the test if they do not exit in time.
+func createDispatcher(t *testing.T) {
+ if dispatcher != nil {
+ d := dispatcher
+ if d.isRunning() {
+ Stop()
+ }
+ if d.asyncDispatchCount.Load() > 0 {
+ waitTimeout := d.dispatchTimeout +
d.asyncDispatchCheckInterval + time.Second
+ if waitTimeout > maxTestAsyncDispatchDrain {
+ waitTimeout = maxTestAsyncDispatchDrain
+ }
+ err := utils.WaitForCondition(func() bool {
+ return d.asyncDispatchCount.Load() == 0
+ }, testAsyncDispatchPollPeriod, waitTimeout)
+ assert.NilError(t, err)
+ }
+ }
once.Do(func() {}) // run nop, so that functions like
RegisterEventHandler() won't run initDispatcher() again
initDispatcher()
}
diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go
index 6b72346d..58db85a9 100644
--- a/pkg/dispatcher/dispatcher.go
+++ b/pkg/dispatcher/dispatcher.go
@@ -35,6 +35,8 @@ import (
var dispatcher *Dispatcher
var once sync.Once
+const defaultAsyncDispatchCheckInterval = 3 * time.Second
+
type EventType int8
const (
@@ -43,13 +45,6 @@ const (
EventTypeNode
)
-var (
- AsyncDispatchLimit int32
- AsyncDispatchCheckInterval = 3 * time.Second
- DispatchTimeout time.Duration
- asyncDispatchCount atomic.Int32 = atomic.Int32{}
-)
-
// central dispatcher that dispatches scheduling events.
type Dispatcher struct {
eventChan chan events.SchedulingEvent
@@ -58,6 +53,11 @@ type Dispatcher struct {
running atomic.Bool
lock locking.RWMutex
stopped sync.WaitGroup
+
+ asyncDispatchLimit int32
+ asyncDispatchCheckInterval time.Duration
+ dispatchTimeout time.Duration
+ asyncDispatchCount atomic.Int32
}
func initDispatcher() {
@@ -67,15 +67,17 @@ func initDispatcher() {
handlers: make(map[EventType]map[string]func(interface{})),
stopChan: make(chan struct{}),
lock: locking.RWMutex{},
+
+ asyncDispatchCheckInterval: defaultAsyncDispatchCheckInterval,
+ dispatchTimeout:
conf.GetSchedulerConf().DispatchTimeout,
+ asyncDispatchLimit: max(10000,
int32(eventChannelCapacity/10)), //nolint:gosec
}
dispatcher.setRunning(false)
- DispatchTimeout = conf.GetSchedulerConf().DispatchTimeout
- AsyncDispatchLimit = max(10000, int32(eventChannelCapacity/10))
//nolint:gosec
log.Log(log.ShimDispatcher).Info("Init dispatcher",
zap.Int("EventChannelCapacity", eventChannelCapacity),
- zap.Int32("AsyncDispatchLimit", AsyncDispatchLimit),
- zap.Float64("DispatchTimeoutInSeconds",
DispatchTimeout.Seconds()))
+ zap.Int32("AsyncDispatchLimit", dispatcher.asyncDispatchLimit),
+ zap.Float64("DispatchTimeoutInSeconds",
dispatcher.dispatchTimeout.Seconds()))
}
func RegisterEventHandler(handlerID string, eventType EventType, handlerFn
func(interface{})) {
@@ -165,26 +167,28 @@ func (p *Dispatcher) dispatch(event
events.SchedulingEvent) error {
}
}
-// async-dispatch try to enqueue the event in every 3 seconds util timeout,
-// it's only called when event channel is full.
+// async-dispatch retries enqueueing the event in every 3 seconds until
dispatchTimeout
+// it's only called when the event channel is full.
func (p *Dispatcher) asyncDispatch(event events.SchedulingEvent) {
- count := asyncDispatchCount.Add(1)
+ count := p.asyncDispatchCount.Add(1)
log.Log(log.ShimDispatcher).Warn("event channel is full, transition to
async-dispatch mode",
zap.Int32("asyncDispatchCount", count))
- if count > AsyncDispatchLimit {
+ if count > p.asyncDispatchLimit {
+ // this event is not being async-dispatched, so undo the count
+ p.asyncDispatchCount.Add(-1)
panic(fmt.Errorf("dispatcher exceeds async-dispatch limit"))
}
go func(beginTime time.Time, stop chan struct{}) {
- defer asyncDispatchCount.Add(-1)
+ defer p.asyncDispatchCount.Add(-1)
for p.isRunning() {
select {
case <-stop:
return
case p.eventChan <- event:
return
- case <-time.After(AsyncDispatchCheckInterval):
+ case <-time.After(p.asyncDispatchCheckInterval):
elapseTime := time.Since(beginTime)
- if elapseTime >= DispatchTimeout {
+ if elapseTime >= p.dispatchTimeout {
log.Log(log.ShimDispatcher).Error("dispatch timeout",
zap.Float64("elapseSeconds",
elapseTime.Seconds()))
return
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]