This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7efcb36f3e refactor(plc4go/spi): generify WorkerPool
7efcb36f3e is described below

commit 7efcb36f3ea0d61fb0e44f52902e23e6b843122c
Author: Sebastian Rühl <[email protected]>
AuthorDate: Tue Mar 21 11:42:50 2023 +0100

    refactor(plc4go/spi): generify WorkerPool
---
 plc4go/spi/RequestTransactionManager.go |  2 +-
 plc4go/spi/utils/WorkerPool.go          | 45 ++++++++++++++++++++-------------
 2 files changed, 29 insertions(+), 18 deletions(-)

diff --git a/plc4go/spi/RequestTransactionManager.go 
b/plc4go/spi/RequestTransactionManager.go
index 10c72f5ac7..b914f8b969 100644
--- a/plc4go/spi/RequestTransactionManager.go
+++ b/plc4go/spi/RequestTransactionManager.go
@@ -37,7 +37,7 @@ import (
 var sharedExecutorInstance utils.Executor // shared instance
 
 func init() {
-       sharedExecutorInstance = *utils.NewFixedSizeExecutor(runtime.NumCPU())
+       sharedExecutorInstance = *utils.NewFixedSizeExecutor(runtime.NumCPU(), 
utils.WithExecutorOptionTracerWorkers(config.TraceTransactionManagerWorkers))
        sharedExecutorInstance.Start()
 }
 
diff --git a/plc4go/spi/utils/WorkerPool.go b/plc4go/spi/utils/WorkerPool.go
index e24fdcc8f7..38d74d00a2 100644
--- a/plc4go/spi/utils/WorkerPool.go
+++ b/plc4go/spi/utils/WorkerPool.go
@@ -21,7 +21,6 @@ package utils
 
 import (
        "fmt"
-       "github.com/apache/plc4x/plc4go/pkg/api/config"
        "github.com/rs/zerolog"
        "github.com/rs/zerolog/log"
        "sync"
@@ -49,7 +48,7 @@ func (w *Worker) work() {
                }
        }()
        workerLog := log.With().Int("Worker id", w.id).Logger()
-       if !config.TraceTransactionManagerWorkers {
+       if !w.executor.traceWorkers {
                workerLog = zerolog.Nop()
        }
 
@@ -75,24 +74,25 @@ func (w *Worker) work() {
 }
 
 type WorkItem struct {
-       transactionId    int32
+       workItemId       int32
        runnable         Runnable
        completionFuture *CompletionFuture
 }
 
 func (w *WorkItem) String() string {
-       return fmt.Sprintf("Workitem{tid:%d}", w.transactionId)
+       return fmt.Sprintf("Workitem{wid:%d}", w.workItemId)
 }
 
 type Executor struct {
-       running     bool
-       shutdown    bool
-       stateChange sync.Mutex
-       worker      []*Worker
-       queue       chan WorkItem
+       running      bool
+       shutdown     bool
+       stateChange  sync.Mutex
+       worker       []*Worker
+       queue        chan WorkItem
+       traceWorkers bool
 }
 
-func NewFixedSizeExecutor(numberOfWorkers int) *Executor {
+func NewFixedSizeExecutor(numberOfWorkers int, options ...ExecutorOption) 
*Executor {
        workers := make([]*Worker, numberOfWorkers)
        for i := 0; i < numberOfWorkers; i++ {
                workers[i] = &Worker{
@@ -103,27 +103,38 @@ func NewFixedSizeExecutor(numberOfWorkers int) *Executor {
                        executor:    nil,
                }
        }
-       executor := Executor{
+       executor := &Executor{
                queue:  make(chan WorkItem, 100),
                worker: workers,
        }
+       for _, option := range options {
+               option(executor)
+       }
        for i := 0; i < numberOfWorkers; i++ {
                worker := workers[i]
-               worker.executor = &executor
+               worker.executor = executor
+       }
+       return executor
+}
+
+type ExecutorOption func(*Executor)
+
+func WithExecutorOptionTracerWorkers(traceWorkers bool) ExecutorOption {
+       return func(executor *Executor) {
+               executor.traceWorkers = traceWorkers
        }
-       return &executor
 }
 
-func (e *Executor) Submit(transactionId int32, runnable Runnable) 
*CompletionFuture {
-       log.Trace().Int32("transactionId", transactionId).Msg("Submitting 
runnable")
+func (e *Executor) Submit(workItemId int32, runnable Runnable) 
*CompletionFuture {
+       log.Trace().Int32("workItemId", workItemId).Msg("Submitting runnable")
        completionFuture := &CompletionFuture{}
        // TODO: add select and timeout if queue is full
        e.queue <- WorkItem{
-               transactionId:    transactionId,
+               workItemId:       workItemId,
                runnable:         runnable,
                completionFuture: completionFuture,
        }
-       log.Trace().Int32("transactionId", transactionId).Msg("runnable queued")
+       log.Trace().Int32("workItemId", workItemId).Msg("runnable queued")
        return completionFuture
 }
 

Reply via email to