This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new d9584bcde7 fix(plc4go/spi): properly shutdown worker spawner and 
killer on shutdown
d9584bcde7 is described below

commit d9584bcde767b716a46f1554b2cf90f79335bab9
Author: Sebastian Rühl <[email protected]>
AuthorDate: Fri Jun 2 12:22:32 2023 +0200

    fix(plc4go/spi): properly shutdown worker spawner and killer on shutdown
---
 plc4go/spi/pool/WorkerPool.go      | 183 +++++++++++++++++++++----------------
 plc4go/spi/pool/WorkerPool_test.go |  18 ++--
 2 files changed, 112 insertions(+), 89 deletions(-)

diff --git a/plc4go/spi/pool/WorkerPool.go b/plc4go/spi/pool/WorkerPool.go
index c1881f1297..c9c031efa1 100644
--- a/plc4go/spi/pool/WorkerPool.go
+++ b/plc4go/spi/pool/WorkerPool.go
@@ -35,11 +35,14 @@ import (
 type Runnable func()
 
 type worker struct {
-       id           int
-       shutdown     atomic.Bool
-       interrupted  atomic.Bool
-       interrupter  chan struct{}
-       executor     *executor
+       id          int
+       shutdown    atomic.Bool
+       interrupted atomic.Bool
+       interrupter chan struct{}
+       executor    interface {
+               isTraceWorkers() bool
+               getWorksItems() chan workItem
+       }
        hasEnded     atomic.Bool
        lastReceived time.Time
 
@@ -65,7 +68,7 @@ func (w *worker) work() {
                }
        }()
        workerLog := w.log.With().Int("Worker id", w.id).Logger()
-       if !w.executor.traceWorkers {
+       if !w.executor.isTraceWorkers() {
                workerLog = zerolog.Nop()
        }
        workerLog.Debug().Msgf("current ended state: %t", w.hasEnded.Load())
@@ -75,7 +78,7 @@ func (w *worker) work() {
        for !w.shutdown.Load() {
                workerLog.Debug().Msg("Working")
                select {
-               case _workItem := <-w.executor.workItems:
+               case _workItem := <-w.executor.getWorksItems():
                        w.lastReceived = time.Now()
                        workerLog.Debug().Msgf("Got work item %v", _workItem)
                        if _workItem.completionFuture.cancelRequested.Load() || 
(w.shutdown.Load() && w.interrupted.Load()) {
@@ -125,6 +128,19 @@ type executor struct {
        log zerolog.Logger
 }
 
+func (e *executor) isTraceWorkers() bool {
+       return e.traceWorkers
+}
+
+func (e *executor) getWorksItems() chan workItem {
+       return e.workItems
+}
+
+type dynamicExecutor struct {
+       executor
+       maxNumberOfWorkers int
+}
+
 func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options 
...options.WithOption) Executor {
        workers := make([]*worker, numberOfWorkers)
        customLogger := options.ExtractCustomLogger(_options...)
@@ -158,11 +174,14 @@ var timeToBecomeUnused = 5 * time.Second
 
 func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, _options 
...options.WithOption) Executor {
        customLogger := options.ExtractCustomLogger(_options...)
-       _executor := &executor{
+       _executor := &dynamicExecutor{
+               executor: executor{
+                       maxNumberOfWorkers: maxNumberOfWorkers,
+                       workItems:          make(chan workItem, queueDepth),
+                       worker:             make([]*worker, 0),
+                       log:                customLogger,
+               },
                maxNumberOfWorkers: maxNumberOfWorkers,
-               workItems:          make(chan workItem, queueDepth),
-               worker:             make([]*worker, 0),
-               log:                customLogger,
        }
        for _, option := range _options {
                switch option := option.(type) {
@@ -178,75 +197,6 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth 
int, _options ...options.
                lastReceived: time.Now(),
                log:          customLogger,
        })
-       mutex := sync.Mutex{}
-       // Worker spawner
-       go func() {
-               defer func() {
-                       if err := recover(); err != nil {
-                               customLogger.Error().Msgf("panic-ed %v", err)
-                       }
-               }()
-               workerLog := customLogger.With().Str("Worker type", 
"spawner").Logger()
-               if !_executor.traceWorkers {
-                       workerLog = zerolog.Nop()
-               }
-               for {
-                       workerLog.Debug().Msgf("Sleeping for %v", 
upScaleInterval)
-                       time.Sleep(upScaleInterval)
-                       mutex.Lock()
-                       numberOfItemsInQueue := len(_executor.workItems)
-                       numberOfWorkers := len(_executor.worker)
-                       workerLog.Debug().Msgf("Checking if %d > %d && %d < 
%d", numberOfItemsInQueue, numberOfWorkers, numberOfWorkers, maxNumberOfWorkers)
-                       if numberOfItemsInQueue > numberOfWorkers && 
numberOfWorkers < maxNumberOfWorkers {
-                               _worker := &worker{
-                                       id:           numberOfWorkers - 1,
-                                       interrupter:  make(chan struct{}, 1),
-                                       executor:     _executor,
-                                       lastReceived: time.Now(),
-                                       log:          customLogger,
-                               }
-                               _executor.worker = append(_executor.worker, 
_worker)
-                               _worker.initialize()
-                               workerLog.Info().Int("Worker id", 
_worker.id).Msg("spawning")
-                               go _worker.work()
-                       } else {
-                               workerLog.Trace().Msg("Nothing to scale")
-                       }
-                       mutex.Unlock()
-               }
-       }()
-       // Worker killer
-       go func() {
-               defer func() {
-                       if err := recover(); err != nil {
-                               _executor.log.Error().Msgf("panic-ed %v", err)
-                       }
-               }()
-               workerLog := customLogger.With().Str("Worker type", 
"killer").Logger()
-               if !_executor.traceWorkers {
-                       workerLog = zerolog.Nop()
-               }
-               for {
-                       workerLog.Debug().Msgf("Sleeping for %v", 
downScaleInterval)
-                       time.Sleep(downScaleInterval)
-                       mutex.Lock()
-                       newWorkers := make([]*worker, 0)
-                       for _, _worker := range _executor.worker {
-                               deadline := time.Now().Add(-timeToBecomeUnused)
-                               workerLog.Debug().Int("Worker id", 
_worker.id).Msgf("Checking if %v is before %v", _worker.lastReceived, deadline)
-                               if _worker.lastReceived.Before(deadline) {
-                                       workerLog.Info().Int("Worker id", 
_worker.id).Msg("killing")
-                                       _worker.interrupted.Store(true)
-                                       close(_worker.interrupter)
-                               } else {
-                                       workerLog.Debug().Int("Worker id", 
_worker.id).Msg("still ok")
-                                       newWorkers = append(newWorkers, _worker)
-                               }
-                       }
-                       _executor.worker = newWorkers
-                       mutex.Unlock()
-               }
-       }()
        return _executor
 }
 
@@ -301,6 +251,79 @@ func (e *executor) Start() {
        }
 }
 
+func (e *dynamicExecutor) Start() {
+       e.executor.Start()
+       mutex := sync.Mutex{}
+       // Worker spawner
+       go func() {
+               defer func() {
+                       if err := recover(); err != nil {
+                               e.log.Error().Msgf("panic-ed %v", err)
+                       }
+               }()
+               workerLog := e.log.With().Str("Worker type", "spawner").Logger()
+               if !e.traceWorkers {
+                       workerLog = zerolog.Nop()
+               }
+               for e.running && !e.shutdown {
+                       workerLog.Debug().Msgf("Sleeping for %v", 
upScaleInterval)
+                       time.Sleep(upScaleInterval)
+                       mutex.Lock()
+                       numberOfItemsInQueue := len(e.workItems)
+                       numberOfWorkers := len(e.worker)
+                       workerLog.Debug().Msgf("Checking if %d > %d && %d < 
%d", numberOfItemsInQueue, numberOfWorkers, numberOfWorkers, 
e.maxNumberOfWorkers)
+                       if numberOfItemsInQueue > numberOfWorkers && 
numberOfWorkers < e.maxNumberOfWorkers {
+                               _worker := &worker{
+                                       id:           numberOfWorkers - 1,
+                                       interrupter:  make(chan struct{}, 1),
+                                       executor:     e,
+                                       lastReceived: time.Now(),
+                                       log:          e.log,
+                               }
+                               e.worker = append(e.worker, _worker)
+                               _worker.initialize()
+                               workerLog.Info().Int("Worker id", 
_worker.id).Msg("spawning")
+                               go _worker.work()
+                       } else {
+                               workerLog.Trace().Msg("Nothing to scale")
+                       }
+                       mutex.Unlock()
+               }
+       }()
+       // Worker killer
+       go func() {
+               defer func() {
+                       if err := recover(); err != nil {
+                               e.log.Error().Msgf("panic-ed %v", err)
+                       }
+               }()
+               workerLog := e.log.With().Str("Worker type", "killer").Logger()
+               if !e.traceWorkers {
+                       workerLog = zerolog.Nop()
+               }
+               for e.running && !e.shutdown {
+                       workerLog.Debug().Msgf("Sleeping for %v", 
downScaleInterval)
+                       time.Sleep(downScaleInterval)
+                       mutex.Lock()
+                       newWorkers := make([]*worker, 0)
+                       for _, _worker := range e.worker {
+                               deadline := time.Now().Add(-timeToBecomeUnused)
+                               workerLog.Debug().Int("Worker id", 
_worker.id).Msgf("Checking if %v is before %v", _worker.lastReceived, deadline)
+                               if _worker.lastReceived.Before(deadline) {
+                                       workerLog.Info().Int("Worker id", 
_worker.id).Msg("killing")
+                                       _worker.interrupted.Store(true)
+                                       close(_worker.interrupter)
+                               } else {
+                                       workerLog.Debug().Int("Worker id", 
_worker.id).Msg("still ok")
+                                       newWorkers = append(newWorkers, _worker)
+                               }
+                       }
+                       e.worker = newWorkers
+                       mutex.Unlock()
+               }
+       }()
+}
+
 func (e *executor) Stop() {
        e.stateChange.Lock()
        defer e.stateChange.Unlock()
diff --git a/plc4go/spi/pool/WorkerPool_test.go 
b/plc4go/spi/pool/WorkerPool_test.go
index d19448ec6e..fc64be079b 100644
--- a/plc4go/spi/pool/WorkerPool_test.go
+++ b/plc4go/spi/pool/WorkerPool_test.go
@@ -279,8 +279,8 @@ func TestNewDynamicExecutor(t *testing.T) {
                name              string
                args              args
                setup             func(*testing.T, *args)
-               manipulator       func(*testing.T, *executor)
-               executorValidator func(*testing.T, *executor) bool
+               manipulator       func(*testing.T, *dynamicExecutor)
+               executorValidator func(*testing.T, *dynamicExecutor) bool
        }{
                {
                        name: "new Executor",
@@ -292,7 +292,7 @@ func TestNewDynamicExecutor(t *testing.T) {
                        setup: func(t *testing.T, args *args) {
                                args.options = append(args.options, 
options.WithCustomLogger(zerolog.New(zerolog.NewConsoleWriter(zerolog.ConsoleTestWriter(t)))))
                        },
-                       executorValidator: func(t *testing.T, e *executor) bool 
{
+                       executorValidator: func(t *testing.T, e 
*dynamicExecutor) bool {
                                assert.False(t, e.running)
                                assert.False(t, e.shutdown)
                                assert.Len(t, e.worker, 1)
@@ -310,7 +310,7 @@ func TestNewDynamicExecutor(t *testing.T) {
                        setup: func(t *testing.T, args *args) {
                                args.options = append(args.options, 
options.WithCustomLogger(zerolog.New(zerolog.NewConsoleWriter(zerolog.ConsoleTestWriter(t)))))
                        },
-                       manipulator: func(t *testing.T, e *executor) {
+                       manipulator: func(t *testing.T, e *dynamicExecutor) {
                                {
                                        oldUpScaleInterval := upScaleInterval
                                        t.Cleanup(func() {
@@ -354,7 +354,7 @@ func TestNewDynamicExecutor(t *testing.T) {
                                        }
                                }()
                        },
-                       executorValidator: func(t *testing.T, e *executor) bool 
{
+                       executorValidator: func(t *testing.T, e 
*dynamicExecutor) bool {
                                time.Sleep(500 * time.Millisecond)
                                assert.False(t, e.running)
                                assert.False(t, e.shutdown)
@@ -367,12 +367,12 @@ func TestNewDynamicExecutor(t *testing.T) {
                        if tt.setup != nil {
                                tt.setup(t, &tt.args)
                        }
-                       fixedSizeExecutor := 
NewDynamicExecutor(tt.args.numberOfWorkers, tt.args.queueDepth, 
tt.args.options...)
-                       defer fixedSizeExecutor.Stop()
+                       dynamicSizedExecutor := 
NewDynamicExecutor(tt.args.numberOfWorkers, tt.args.queueDepth, 
tt.args.options...)
+                       defer dynamicSizedExecutor.Stop()
                        if tt.manipulator != nil {
-                               tt.manipulator(t, fixedSizeExecutor.(*executor))
+                               tt.manipulator(t, 
dynamicSizedExecutor.(*dynamicExecutor))
                        }
-                       assert.True(t, tt.executorValidator(t, 
fixedSizeExecutor.(*executor)), "NewFixedSizeExecutor(%v, %v, %v)", 
tt.args.numberOfWorkers, tt.args.queueDepth, tt.args.options)
+                       assert.True(t, tt.executorValidator(t, 
dynamicSizedExecutor.(*dynamicExecutor)), "NewFixedSizeExecutor(%v, %v, %v)", 
tt.args.numberOfWorkers, tt.args.queueDepth, tt.args.options)
                })
        }
 }

Reply via email to