This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new aae4c35e [YUNIKORN-2525] dispatcher.Stop() waits an extra second
unnecessarily (#811)
aae4c35e is described below
commit aae4c35ee8dd86819256bc1fac9babf223fccc9e
Author: Peter Bacsko <[email protected]>
AuthorDate: Thu Apr 4 20:48:21 2024 +0200
[YUNIKORN-2525] dispatcher.Stop() waits an extra second unnecessarily (#811)
Closes: #811
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/dispatcher/dispatcher.go | 24 ++++++++++++++++++------
1 file changed, 18 insertions(+), 6 deletions(-)
diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go
index f2727143..51e46ec6 100644
--- a/pkg/dispatcher/dispatcher.go
+++ b/pkg/dispatcher/dispatcher.go
@@ -56,6 +56,7 @@ type Dispatcher struct {
handlers map[EventType]map[string]func(interface{})
running atomic.Value
lock sync.RWMutex
+ stopped sync.WaitGroup
}
func initDispatcher() {
@@ -211,6 +212,7 @@ func Start() {
return
}
getDispatcher().stopChan = make(chan struct{})
+ getDispatcher().stopped.Add(1)
go func() {
for {
select {
@@ -229,6 +231,7 @@ func Start() {
case <-getDispatcher().stopChan:
log.Log(log.ShimDispatcher).Info("shutting down
event channel")
getDispatcher().setRunning(false)
+ getDispatcher().stopped.Done()
return
}
}
@@ -257,13 +260,22 @@ func Stop() {
}
close(getDispatcher().stopChan)
- maxTimeout := 5
- for getDispatcher().isRunning() && maxTimeout > 0 {
- log.Log(log.ShimDispatcher).Info("waiting for dispatcher to be
stopped",
- zap.Int("remainingSeconds", maxTimeout))
- time.Sleep(1 * time.Second)
- maxTimeout--
+ stopWait := make(chan struct{})
+
+ go func() {
+ defer close(stopWait)
+ getDispatcher().stopped.Wait()
+ }()
+
+ // wait until the main event loop stops properly
+ select {
+ case <-stopWait:
+ break
+ case <-time.After(5 * time.Second):
+ log.Log(log.ShimDispatcher).Info("dispatcher did not stop in
time")
+ break
}
+
if getDispatcher().isRunning() {
log.Log(log.ShimDispatcher).Warn("dispatcher even processing
did not stop properly")
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]