This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 48c398c916 feat(plc4go/spi): use atomic.Bool for state changes on
WorkerPool
48c398c916 is described below
commit 48c398c916a4dd07f7ee9024fc11b19db3330664
Author: Sebastian Rühl <[email protected]>
AuthorDate: Tue Mar 21 12:42:36 2023 +0100
feat(plc4go/spi): use atomic.Bool for state changes on WorkerPool
---
plc4go/spi/RequestTransactionManager.go | 2 +-
plc4go/spi/utils/WorkerPool.go | 52 +++++++++++++++------------------
2 files changed, 25 insertions(+), 29 deletions(-)
diff --git a/plc4go/spi/RequestTransactionManager.go
b/plc4go/spi/RequestTransactionManager.go
index a6b5268559..062d877e34 100644
--- a/plc4go/spi/RequestTransactionManager.go
+++ b/plc4go/spi/RequestTransactionManager.go
@@ -38,7 +38,7 @@ import (
var sharedExecutorInstance utils.Executor // shared instance
func init() {
- sharedExecutorInstance = *utils.NewFixedSizeExecutor(runtime.NumCPU(),
utils.WithExecutorOptionTracerWorkers(config.TraceTransactionManagerWorkers))
+ sharedExecutorInstance = *utils.NewFixedSizeExecutor(runtime.NumCPU(),
100,
utils.WithExecutorOptionTracerWorkers(config.TraceTransactionManagerWorkers))
sharedExecutorInstance.Start()
}
diff --git a/plc4go/spi/utils/WorkerPool.go b/plc4go/spi/utils/WorkerPool.go
index 9dbb32b885..162737c215 100644
--- a/plc4go/spi/utils/WorkerPool.go
+++ b/plc4go/spi/utils/WorkerPool.go
@@ -25,6 +25,7 @@ import (
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"sync"
+ "sync/atomic"
"time"
)
@@ -32,9 +33,9 @@ type Runnable func()
type Worker struct {
id int
- shutdown bool
+ shutdown atomic.Bool
runnable Runnable
- interrupted bool
+ interrupted atomic.Bool
executor *Executor
}
@@ -43,7 +44,7 @@ func (w *Worker) work() {
if recovered := recover(); recovered != nil {
log.Error().Msgf("Recovering from panic()=%v",
recovered)
}
- if !w.shutdown {
+ if !w.shutdown.Load() {
// if we are not in shutdown we continue
w.work()
}
@@ -53,12 +54,12 @@ func (w *Worker) work() {
workerLog = zerolog.Nop()
}
- for !w.shutdown {
+ for !w.shutdown.Load() {
workerLog.Debug().Msg("Working")
select {
case workItem := <-w.executor.queue:
workerLog.Debug().Msgf("Got work item %v", workItem)
- if workItem.completionFuture.cancelRequested ||
(w.shutdown && w.interrupted) {
+ if workItem.completionFuture.cancelRequested.Load() ||
(w.shutdown.Load() && w.interrupted.Load()) {
workerLog.Debug().Msg("We need to stop")
// TODO: do we need to complete with a error?
} else {
@@ -93,27 +94,22 @@ type Executor struct {
traceWorkers bool
}
-func NewFixedSizeExecutor(numberOfWorkers int, options ...ExecutorOption)
*Executor {
+func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, options
...ExecutorOption) *Executor {
workers := make([]*Worker, numberOfWorkers)
for i := 0; i < numberOfWorkers; i++ {
workers[i] = &Worker{
- id: i,
- shutdown: false,
- runnable: nil,
- interrupted: false,
- executor: nil,
+ id: i,
}
}
executor := &Executor{
- queue: make(chan WorkItem, 100),
+ queue: make(chan WorkItem, queueDepth),
worker: workers,
}
for _, option := range options {
option(executor)
}
for i := 0; i < numberOfWorkers; i++ {
- worker := workers[i]
- worker.executor = executor
+ workers[i].executor = executor
}
return executor
}
@@ -163,8 +159,8 @@ func (e *Executor) Stop() {
close(e.queue)
for i := 0; i < len(e.worker); i++ {
worker := e.worker[i]
- worker.shutdown = true
- worker.interrupted = true
+ worker.shutdown.Store(true)
+ worker.interrupted.Store(true)
}
e.running = false
}
@@ -175,30 +171,30 @@ type CompletionFuture interface {
}
type future struct {
- cancelRequested bool
- interruptRequested bool
- completed bool
- errored bool
- err error
+ cancelRequested atomic.Bool
+ interruptRequested atomic.Bool
+ completed atomic.Bool
+ errored atomic.Bool
+ err atomic.Value
}
func (f *future) Cancel(interrupt bool, err error) {
- f.cancelRequested = true
- f.interruptRequested = interrupt
- f.errored = true
- f.err = err
+ f.cancelRequested.Store(true)
+ f.interruptRequested.Store(interrupt)
+ f.errored.Store(true)
+ f.err.Store(err)
}
func (f *future) complete() {
- f.completed = true
+ f.completed.Store(true)
}
func (f *future) AwaitCompletion(ctx context.Context) error {
- for !f.completed && !f.errored && ctx.Err() != nil {
+ for !f.completed.Load() && !f.errored.Load() && ctx.Err() != nil {
time.Sleep(time.Millisecond * 10)
}
if err := ctx.Err(); err != nil {
return err
}
- return f.err
+ return f.err.Load().(error)
}