This is an automated email from the ASF dual-hosted git repository.

wilfred-s pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new fa00b094 [YUNIKORN-2576] Fix data race in dispatcher test (#1034)
fa00b094 is described below

commit fa00b09407c3040bf46f6578ee75f6400c50768c
Author: sidbroski <[email protected]>
AuthorDate: Fri Jun 5 09:48:29 2026 +1000

    [YUNIKORN-2576] Fix data race in dispatcher test (#1034)
    
    Moved async dispatch settings and asyncDispatchCount from package variables
    into the Dispatcher struct, and have createDispatcher() stop and drain in
    flight async goroutines before reinitialisation. This prevents tests from
    racing on shared state.
    
    Verified 'go test ./pkg/dispatcher/... -race -count=10  > shim-race.txt'.
    
    Closes: #1034
    
    Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
 pkg/dispatcher/dispatch_test.go | 63 ++++++++++++++++++++++++++++-------------
 pkg/dispatcher/dispatcher.go    | 40 ++++++++++++++------------
 2 files changed, 65 insertions(+), 38 deletions(-)

diff --git a/pkg/dispatcher/dispatch_test.go b/pkg/dispatcher/dispatch_test.go
index 50aec6ae..163c8cf8 100644
--- a/pkg/dispatcher/dispatch_test.go
+++ b/pkg/dispatcher/dispatch_test.go
@@ -51,11 +51,15 @@ func (t TestAppEvent) GetArgs() []interface{} {
        return nil
 }
 
-const RunApplication string = "RunApplication"
+const (
+       RunApplication              = "RunApplication"
+       maxTestAsyncDispatchDrain   = 15 * time.Second
+       testAsyncDispatchPollPeriod = 100 * time.Millisecond
+)
 
 func TestRegisterEventHandler(t *testing.T) {
-       createDispatcher()
-       defer createDispatcher()
+       createDispatcher(t)
+       defer createDispatcher(t)
 
        RegisterEventHandler("TestAppHandler", EventTypeApp, func(obj 
interface{}) {})
        RegisterEventHandler("TestTaskHandler", EventTypeTask, func(obj 
interface{}) {})
@@ -103,8 +107,8 @@ func (a *appEventsRecorder) size() int {
 }
 
 func TestDispatcherStartStop(t *testing.T) {
-       createDispatcher()
-       defer createDispatcher()
+       createDispatcher(t)
+       defer createDispatcher(t)
        // thread safe
        recorder := &appEventsRecorder{
                apps: make([]string, 0),
@@ -157,8 +161,8 @@ func TestDispatcherStartStop(t *testing.T) {
 // Test sending events from multiple senders in parallel,
 // verify that events won't be lost
 func TestEventWillNotBeLostWhenEventChannelIsFull(t *testing.T) {
-       createDispatcher()
-       defer createDispatcher()
+       createDispatcher(t)
+       defer createDispatcher(t)
        dispatcher.eventChan = make(chan events.SchedulingEvent, 1)
 
        // thread safe
@@ -187,7 +191,7 @@ func TestEventWillNotBeLostWhenEventChannelIsFull(t 
*testing.T) {
        }
 
        // check event channel is full and some events are dispatched 
asynchronously
-       assert.Assert(t, asyncDispatchCount.Load() > 0)
+       assert.Assert(t, dispatcher.asyncDispatchCount.Load() > 0)
 
        // wait until all events are handled
        dispatcher.drain()
@@ -197,7 +201,7 @@ func TestEventWillNotBeLostWhenEventChannelIsFull(t 
*testing.T) {
 
        // assert all event are handled
        assert.Equal(t, recorder.size(), numEvents)
-       assert.Assert(t, asyncDispatchCount.Load() == 0)
+       assert.Assert(t, dispatcher.asyncDispatchCount.Load() == 0)
 
        // ensure state is stopped
        assert.Equal(t, dispatcher.isRunning(), false)
@@ -206,12 +210,12 @@ func TestEventWillNotBeLostWhenEventChannelIsFull(t 
*testing.T) {
 // Test dispatch timeout, verify that Dispatcher#asyncDispatch is called when 
event channel is full
 // and will disappear after timeout.
 func TestDispatchTimeout(t *testing.T) {
-       createDispatcher()
-       defer createDispatcher()
+       createDispatcher(t)
+       defer createDispatcher(t)
        // reset event channel with small capacity for testing
        dispatcher.eventChan = make(chan events.SchedulingEvent, 1)
-       AsyncDispatchCheckInterval = 100 * time.Millisecond
-       DispatchTimeout = 500 * time.Millisecond
+       dispatcher.asyncDispatchCheckInterval = 100 * time.Millisecond
+       dispatcher.dispatchTimeout = 500 * time.Millisecond
 
        // start the handler, but waiting on a flag
        RegisterEventHandler("TestAppHandler", EventTypeApp, func(obj 
interface{}) {
@@ -240,7 +244,7 @@ func TestDispatchTimeout(t *testing.T) {
        // 2nd one should be added to the channel
        // 3rd one should be posted as an async request
        time.Sleep(100 * time.Millisecond)
-       assert.Equal(t, asyncDispatchCount.Load(), int32(1))
+       assert.Equal(t, dispatcher.asyncDispatchCount.Load(), int32(1))
 
        // verify Dispatcher#asyncDispatch is called
        buf := make([]byte, 1<<16)
@@ -249,8 +253,8 @@ func TestDispatchTimeout(t *testing.T) {
 
        // wait until async dispatch routine times out
        err := utils.WaitForCondition(func() bool {
-               return asyncDispatchCount.Load() == int32(0)
-       }, 100*time.Millisecond, DispatchTimeout+AsyncDispatchCheckInterval)
+               return dispatcher.asyncDispatchCount.Load() == 0
+       }, testAsyncDispatchPollPeriod, 
dispatcher.dispatchTimeout+dispatcher.asyncDispatchCheckInterval)
        assert.NilError(t, err)
 
        // verify no left-over thread
@@ -265,12 +269,12 @@ func TestDispatchTimeout(t *testing.T) {
 
 // Test exceeding the async-dispatch limit, should panic immediately.
 func TestExceedAsyncDispatchLimit(t *testing.T) {
-       createDispatcher()
-       defer createDispatcher()
+       createDispatcher(t)
+       defer createDispatcher(t)
 
        // reset event channel with small capacity for testing
        dispatcher.eventChan = make(chan events.SchedulingEvent, 1)
-       AsyncDispatchLimit = 1
+       dispatcher.asyncDispatchLimit = 1
        // pretend to be an time-consuming event-handler
        RegisterEventHandler("TestAppHandler", EventTypeApp, func(obj 
interface{}) {
                if _, ok := obj.(events.ApplicationEvent); ok {
@@ -301,7 +305,26 @@ func TestExceedAsyncDispatchLimit(t *testing.T) {
        }
 }
 
-func createDispatcher() {
+// createDispatcher resets the dispatcher for a clean test state. It first 
drains
+// any async dispatch goroutines left by a previous test before reinitialising,
+// failing the test if they do not exit in time.
+func createDispatcher(t *testing.T) {
+       if dispatcher != nil {
+               d := dispatcher
+               if d.isRunning() {
+                       Stop()
+               }
+               if d.asyncDispatchCount.Load() > 0 {
+                       waitTimeout := d.dispatchTimeout + 
d.asyncDispatchCheckInterval + time.Second
+                       if waitTimeout > maxTestAsyncDispatchDrain {
+                               waitTimeout = maxTestAsyncDispatchDrain
+                       }
+                       err := utils.WaitForCondition(func() bool {
+                               return d.asyncDispatchCount.Load() == 0
+                       }, testAsyncDispatchPollPeriod, waitTimeout)
+                       assert.NilError(t, err)
+               }
+       }
        once.Do(func() {}) // run nop, so that functions like 
RegisterEventHandler() won't run initDispatcher() again
        initDispatcher()
 }
diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go
index 6b72346d..58db85a9 100644
--- a/pkg/dispatcher/dispatcher.go
+++ b/pkg/dispatcher/dispatcher.go
@@ -35,6 +35,8 @@ import (
 var dispatcher *Dispatcher
 var once sync.Once
 
+const defaultAsyncDispatchCheckInterval = 3 * time.Second
+
 type EventType int8
 
 const (
@@ -43,13 +45,6 @@ const (
        EventTypeNode
 )
 
-var (
-       AsyncDispatchLimit         int32
-       AsyncDispatchCheckInterval = 3 * time.Second
-       DispatchTimeout            time.Duration
-       asyncDispatchCount         atomic.Int32 = atomic.Int32{}
-)
-
 // central dispatcher that dispatches scheduling events.
 type Dispatcher struct {
        eventChan chan events.SchedulingEvent
@@ -58,6 +53,11 @@ type Dispatcher struct {
        running   atomic.Bool
        lock      locking.RWMutex
        stopped   sync.WaitGroup
+
+       asyncDispatchLimit         int32
+       asyncDispatchCheckInterval time.Duration
+       dispatchTimeout            time.Duration
+       asyncDispatchCount         atomic.Int32
 }
 
 func initDispatcher() {
@@ -67,15 +67,17 @@ func initDispatcher() {
                handlers:  make(map[EventType]map[string]func(interface{})),
                stopChan:  make(chan struct{}),
                lock:      locking.RWMutex{},
+
+               asyncDispatchCheckInterval: defaultAsyncDispatchCheckInterval,
+               dispatchTimeout:            
conf.GetSchedulerConf().DispatchTimeout,
+               asyncDispatchLimit:         max(10000, 
int32(eventChannelCapacity/10)), //nolint:gosec
        }
        dispatcher.setRunning(false)
-       DispatchTimeout = conf.GetSchedulerConf().DispatchTimeout
-       AsyncDispatchLimit = max(10000, int32(eventChannelCapacity/10)) 
//nolint:gosec
 
        log.Log(log.ShimDispatcher).Info("Init dispatcher",
                zap.Int("EventChannelCapacity", eventChannelCapacity),
-               zap.Int32("AsyncDispatchLimit", AsyncDispatchLimit),
-               zap.Float64("DispatchTimeoutInSeconds", 
DispatchTimeout.Seconds()))
+               zap.Int32("AsyncDispatchLimit", dispatcher.asyncDispatchLimit),
+               zap.Float64("DispatchTimeoutInSeconds", 
dispatcher.dispatchTimeout.Seconds()))
 }
 
 func RegisterEventHandler(handlerID string, eventType EventType, handlerFn 
func(interface{})) {
@@ -165,26 +167,28 @@ func (p *Dispatcher) dispatch(event 
events.SchedulingEvent) error {
        }
 }
 
-// async-dispatch try to enqueue the event in every 3 seconds util timeout,
-// it's only called when event channel is full.
+// async-dispatch retries enqueueing the event in every 3 seconds until 
dispatchTimeout
+// it's only called when the event channel is full.
 func (p *Dispatcher) asyncDispatch(event events.SchedulingEvent) {
-       count := asyncDispatchCount.Add(1)
+       count := p.asyncDispatchCount.Add(1)
        log.Log(log.ShimDispatcher).Warn("event channel is full, transition to 
async-dispatch mode",
                zap.Int32("asyncDispatchCount", count))
-       if count > AsyncDispatchLimit {
+       if count > p.asyncDispatchLimit {
+               // this event is not being async-dispatched, so undo the count
+               p.asyncDispatchCount.Add(-1)
                panic(fmt.Errorf("dispatcher exceeds async-dispatch limit"))
        }
        go func(beginTime time.Time, stop chan struct{}) {
-               defer asyncDispatchCount.Add(-1)
+               defer p.asyncDispatchCount.Add(-1)
                for p.isRunning() {
                        select {
                        case <-stop:
                                return
                        case p.eventChan <- event:
                                return
-                       case <-time.After(AsyncDispatchCheckInterval):
+                       case <-time.After(p.asyncDispatchCheckInterval):
                                elapseTime := time.Since(beginTime)
-                               if elapseTime >= DispatchTimeout {
+                               if elapseTime >= p.dispatchTimeout {
                                        
log.Log(log.ShimDispatcher).Error("dispatch timeout",
                                                zap.Float64("elapseSeconds", 
elapseTime.Seconds()))
                                        return


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to