This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new aae4c35e [YUNIKORN-2525] dispatcher.Stop() waits an extra second 
unnecessarily (#811)
aae4c35e is described below

commit aae4c35ee8dd86819256bc1fac9babf223fccc9e
Author: Peter Bacsko <[email protected]>
AuthorDate: Thu Apr 4 20:48:21 2024 +0200

    [YUNIKORN-2525] dispatcher.Stop() waits an extra second unnecessarily (#811)
    
    Closes: #811
    
    Signed-off-by: Peter Bacsko <[email protected]>
---
 pkg/dispatcher/dispatcher.go | 24 ++++++++++++++++++------
 1 file changed, 18 insertions(+), 6 deletions(-)

diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go
index f2727143..51e46ec6 100644
--- a/pkg/dispatcher/dispatcher.go
+++ b/pkg/dispatcher/dispatcher.go
@@ -56,6 +56,7 @@ type Dispatcher struct {
        handlers  map[EventType]map[string]func(interface{})
        running   atomic.Value
        lock      sync.RWMutex
+       stopped   sync.WaitGroup
 }
 
 func initDispatcher() {
@@ -211,6 +212,7 @@ func Start() {
                return
        }
        getDispatcher().stopChan = make(chan struct{})
+       getDispatcher().stopped.Add(1)
        go func() {
                for {
                        select {
@@ -229,6 +231,7 @@ func Start() {
                        case <-getDispatcher().stopChan:
                                log.Log(log.ShimDispatcher).Info("shutting down 
event channel")
                                getDispatcher().setRunning(false)
+                               getDispatcher().stopped.Done()
                                return
                        }
                }
@@ -257,13 +260,22 @@ func Stop() {
        }
 
        close(getDispatcher().stopChan)
-       maxTimeout := 5
-       for getDispatcher().isRunning() && maxTimeout > 0 {
-               log.Log(log.ShimDispatcher).Info("waiting for dispatcher to be 
stopped",
-                       zap.Int("remainingSeconds", maxTimeout))
-               time.Sleep(1 * time.Second)
-               maxTimeout--
+       stopWait := make(chan struct{})
+
+       go func() {
+               defer close(stopWait)
+               getDispatcher().stopped.Wait()
+       }()
+
+       // wait until the main event loop stops properly
+       select {
+       case <-stopWait:
+               break
+       case <-time.After(5 * time.Second):
+               log.Log(log.ShimDispatcher).Info("dispatcher did not stop in 
time")
+               break
        }
+
        if getDispatcher().isRunning() {
                log.Log(log.ShimDispatcher).Warn("dispatcher even processing 
did not stop properly")
        } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to