This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 48c398c916 feat(plc4go/spi): use atomic.Bool for state changes on 
WorkerPool
48c398c916 is described below

commit 48c398c916a4dd07f7ee9024fc11b19db3330664
Author: Sebastian Rühl <[email protected]>
AuthorDate: Tue Mar 21 12:42:36 2023 +0100

    feat(plc4go/spi): use atomic.Bool for state changes on WorkerPool
---
 plc4go/spi/RequestTransactionManager.go |  2 +-
 plc4go/spi/utils/WorkerPool.go          | 52 +++++++++++++++------------------
 2 files changed, 25 insertions(+), 29 deletions(-)

diff --git a/plc4go/spi/RequestTransactionManager.go 
b/plc4go/spi/RequestTransactionManager.go
index a6b5268559..062d877e34 100644
--- a/plc4go/spi/RequestTransactionManager.go
+++ b/plc4go/spi/RequestTransactionManager.go
@@ -38,7 +38,7 @@ import (
 var sharedExecutorInstance utils.Executor // shared instance
 
 func init() {
-       sharedExecutorInstance = *utils.NewFixedSizeExecutor(runtime.NumCPU(), 
utils.WithExecutorOptionTracerWorkers(config.TraceTransactionManagerWorkers))
+       sharedExecutorInstance = *utils.NewFixedSizeExecutor(runtime.NumCPU(), 
100, 
utils.WithExecutorOptionTracerWorkers(config.TraceTransactionManagerWorkers))
        sharedExecutorInstance.Start()
 }
 
diff --git a/plc4go/spi/utils/WorkerPool.go b/plc4go/spi/utils/WorkerPool.go
index 9dbb32b885..162737c215 100644
--- a/plc4go/spi/utils/WorkerPool.go
+++ b/plc4go/spi/utils/WorkerPool.go
@@ -25,6 +25,7 @@ import (
        "github.com/rs/zerolog"
        "github.com/rs/zerolog/log"
        "sync"
+       "sync/atomic"
        "time"
 )
 
@@ -32,9 +33,9 @@ type Runnable func()
 
 type Worker struct {
        id          int
-       shutdown    bool
+       shutdown    atomic.Bool
        runnable    Runnable
-       interrupted bool
+       interrupted atomic.Bool
        executor    *Executor
 }
 
@@ -43,7 +44,7 @@ func (w *Worker) work() {
                if recovered := recover(); recovered != nil {
                        log.Error().Msgf("Recovering from panic()=%v", 
recovered)
                }
-               if !w.shutdown {
+               if !w.shutdown.Load() {
                        // if we are not in shutdown we continue
                        w.work()
                }
@@ -53,12 +54,12 @@ func (w *Worker) work() {
                workerLog = zerolog.Nop()
        }
 
-       for !w.shutdown {
+       for !w.shutdown.Load() {
                workerLog.Debug().Msg("Working")
                select {
                case workItem := <-w.executor.queue:
                        workerLog.Debug().Msgf("Got work item %v", workItem)
-                       if workItem.completionFuture.cancelRequested || 
(w.shutdown && w.interrupted) {
+                       if workItem.completionFuture.cancelRequested.Load() || 
(w.shutdown.Load() && w.interrupted.Load()) {
                                workerLog.Debug().Msg("We need to stop")
                                // TODO: do we need to complete with a error?
                        } else {
@@ -93,27 +94,22 @@ type Executor struct {
        traceWorkers bool
 }
 
-func NewFixedSizeExecutor(numberOfWorkers int, options ...ExecutorOption) 
*Executor {
+func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, options 
...ExecutorOption) *Executor {
        workers := make([]*Worker, numberOfWorkers)
        for i := 0; i < numberOfWorkers; i++ {
                workers[i] = &Worker{
-                       id:          i,
-                       shutdown:    false,
-                       runnable:    nil,
-                       interrupted: false,
-                       executor:    nil,
+                       id: i,
                }
        }
        executor := &Executor{
-               queue:  make(chan WorkItem, 100),
+               queue:  make(chan WorkItem, queueDepth),
                worker: workers,
        }
        for _, option := range options {
                option(executor)
        }
        for i := 0; i < numberOfWorkers; i++ {
-               worker := workers[i]
-               worker.executor = executor
+               workers[i].executor = executor
        }
        return executor
 }
@@ -163,8 +159,8 @@ func (e *Executor) Stop() {
        close(e.queue)
        for i := 0; i < len(e.worker); i++ {
                worker := e.worker[i]
-               worker.shutdown = true
-               worker.interrupted = true
+               worker.shutdown.Store(true)
+               worker.interrupted.Store(true)
        }
        e.running = false
 }
@@ -175,30 +171,30 @@ type CompletionFuture interface {
 }
 
 type future struct {
-       cancelRequested    bool
-       interruptRequested bool
-       completed          bool
-       errored            bool
-       err                error
+       cancelRequested    atomic.Bool
+       interruptRequested atomic.Bool
+       completed          atomic.Bool
+       errored            atomic.Bool
+       err                atomic.Value
 }
 
 func (f *future) Cancel(interrupt bool, err error) {
-       f.cancelRequested = true
-       f.interruptRequested = interrupt
-       f.errored = true
-       f.err = err
+       f.cancelRequested.Store(true)
+       f.interruptRequested.Store(interrupt)
+       f.errored.Store(true)
+       f.err.Store(err)
 }
 
 func (f *future) complete() {
-       f.completed = true
+       f.completed.Store(true)
 }
 
 func (f *future) AwaitCompletion(ctx context.Context) error {
-       for !f.completed && !f.errored && ctx.Err() != nil {
+       for !f.completed.Load() && !f.errored.Load() && ctx.Err() != nil {
                time.Sleep(time.Millisecond * 10)
        }
        if err := ctx.Err(); err != nil {
                return err
        }
-       return f.err
+       return f.err.Load().(error)
 }

Reply via email to