This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit a7bdaeaac4a2f081a90537c6b2000eb9bf7d17f5
Author: Sebastian Rühl <[email protected]>
AuthorDate: Mon Nov 10 16:09:31 2025 +0100

    feat(plc4go): add names to executors and refine ids.
---
 plc4go/spi/pool/WorkerPool.go                      |  4 +-
 plc4go/spi/pool/WorkerPool_test.go                 | 10 +++--
 plc4go/spi/pool/dynamicExecutor.go                 | 13 ++++---
 plc4go/spi/pool/executor.go                        | 12 +++++-
 plc4go/spi/pool/executor_plc4xgen.go               |  8 ++++
 plc4go/spi/pool/executor_test.go                   |  2 +-
 plc4go/spi/pool/worker.go                          | 10 ++---
 plc4go/spi/pool/worker_plc4xgen.go                 |  2 +-
 plc4go/spi/pool/worker_test.go                     | 18 ++++-----
 .../transactions/RequestTransactionManager_test.go | 43 ++++++++++------------
 10 files changed, 71 insertions(+), 51 deletions(-)

diff --git a/plc4go/spi/pool/WorkerPool.go b/plc4go/spi/pool/WorkerPool.go
index 8bbc1c0149..0e018d04e7 100644
--- a/plc4go/spi/pool/WorkerPool.go
+++ b/plc4go/spi/pool/WorkerPool.go
@@ -21,6 +21,7 @@ package pool
 
 import (
        "context"
+       "fmt"
        "io"
        "time"
 
@@ -54,7 +55,8 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, 
_options ...options.
        _executor := newDynamicExecutor(queueDepth, maxNumberOfWorkers, 
customLogger)
        _executor.traceWorkers, _ = options.ExtractTracerWorkers(_options...)
        // We spawn one initial worker
-       w := newWorker(customLogger, 0, _executor)
+       workerId := fmt.Sprintf("%s-worker-%d", _executor.name, 0)
+       w := newWorker(customLogger, workerId, _executor)
        w.lastReceived.Store(time.Now()) // We store the current timestamp so 
the worker isn't cut of instantly by the worker killer
        _executor.worker = append(_executor.worker, w)
        return _executor
diff --git a/plc4go/spi/pool/WorkerPool_test.go 
b/plc4go/spi/pool/WorkerPool_test.go
index 19f4b61d42..343567f599 100644
--- a/plc4go/spi/pool/WorkerPool_test.go
+++ b/plc4go/spi/pool/WorkerPool_test.go
@@ -143,7 +143,8 @@ func TestNewDynamicExecutor(t *testing.T) {
                                t.Cleanup(wg.Wait)
                                wg.Go(func() {
                                        for i := 0; i < 500; i++ {
-                                               e.workItems <- workItem{
+                                               select {
+                                               case e.workItems <- workItem{
                                                        workItemId: int32(i),
                                                        runnable: 
func(runnableCtx context.Context) {
                                                                ctx, cancel := 
context.WithCancel(t.Context())
@@ -158,7 +159,10 @@ func TestNewDynamicExecutor(t *testing.T) {
                                                                case 
<-ctx.Done():
                                                                }
                                                        },
-                                                       completionFuture: 
&future{},
+                                                       completionFuture: 
&future{}}:
+                                                       t.Logf("Item %d added", 
i)
+                                               case <-t.Context().Done():
+                                                       return
                                                }
                                        }
                                })
@@ -177,7 +181,7 @@ func TestNewDynamicExecutor(t *testing.T) {
                                tt.setup(t, &tt.args)
                        }
                        dynamicSizedExecutor := 
NewDynamicExecutor(tt.args.numberOfWorkers, tt.args.queueDepth, 
tt.args.options...)
-                       defer dynamicSizedExecutor.Stop()
+                       t.Cleanup(dynamicSizedExecutor.Stop)
                        if tt.manipulator != nil {
                                tt.manipulator(t, 
dynamicSizedExecutor.(*dynamicExecutor))
                        }
diff --git a/plc4go/spi/pool/dynamicExecutor.go 
b/plc4go/spi/pool/dynamicExecutor.go
index f3796d6217..6c61f1afa6 100644
--- a/plc4go/spi/pool/dynamicExecutor.go
+++ b/plc4go/spi/pool/dynamicExecutor.go
@@ -20,6 +20,7 @@
 package pool
 
 import (
+       "fmt"
        "runtime/debug"
        "sync"
        "sync/atomic"
@@ -95,11 +96,11 @@ func (e *dynamicExecutor) Start() {
                                Msg("Checking if numberOfItemsInQueue > 
numberOfWorkers && numberOfWorkers < maxNumberOfWorkers")
                        if numberOfItemsInQueue > numberOfWorkers && 
numberOfWorkers < e.maxNumberOfWorkers {
                                workerLog.Trace().Msg("spawning new worker")
-                               workerId := numberOfWorkers - 1
+                               workerId := fmt.Sprintf("%s-worker-%d", e.name, 
numberOfWorkers-1)
                                _worker := newWorker(e.log, workerId, e)
                                _worker.lastReceived.Store(time.Now()) // We 
store the current timestamp so the worker isn't cut of instantly by the worker 
killer
                                e.worker = append(e.worker, _worker)
-                               workerLog.Info().Int("Worker id", 
_worker.id).Msg("spawning")
+                               workerLog.Info().Str("Worker id", 
_worker.id).Msg("spawning")
                                _worker.start()
                                e.currentNumberOfWorkers.Add(1)
                        } else {
@@ -140,16 +141,16 @@ func (e *dynamicExecutor) Start() {
                        for _, _worker := range e.worker {
                                deadline := time.Now().Add(-timeToBecomeUnused)
                                workerLog.Debug().
-                                       Int("workerId", _worker.id).
+                                       Str("workerId", _worker.id).
                                        Time("lastReceived", 
_worker.lastReceived.Load().(time.Time)).
                                        Time("deadline", deadline).
                                        Msg("Checking if lastReceived is before 
deadline")
                                if 
_worker.lastReceived.Load().(time.Time).Before(deadline) {
-                                       workerLog.Info().Int("Worker id", 
_worker.id).Msg("killing")
+                                       workerLog.Info().Str("Worker id", 
_worker.id).Msg("killing")
                                        _worker.stop(true)
                                        e.currentNumberOfWorkers.Add(-1)
                                } else {
-                                       workerLog.Debug().Int("Worker id", 
_worker.id).Msg("still ok")
+                                       workerLog.Debug().Str("Worker id", 
_worker.id).Msg("still ok")
                                        newWorkers = append(newWorkers, _worker)
                                        workersChanged = true
                                }
@@ -175,7 +176,7 @@ func (e *dynamicExecutor) Start() {
 }
 
 func (e *dynamicExecutor) Stop() {
-       defer utils.StopWarn(e.log)()
+       defer utils.StopWarn(e.log, utils.WithStopWarnProcessId(e.name))()
        e.log.Trace().Msg("stopping now")
        e.dynamicStateChange.Lock()
        defer e.dynamicStateChange.Unlock()
diff --git a/plc4go/spi/pool/executor.go b/plc4go/spi/pool/executor.go
index a63e6e8bc3..ae5f750b94 100644
--- a/plc4go/spi/pool/executor.go
+++ b/plc4go/spi/pool/executor.go
@@ -21,6 +21,7 @@ package pool
 
 import (
        "context"
+       "fmt"
        "sync"
        "sync/atomic"
 
@@ -30,12 +31,18 @@ import (
        "github.com/apache/plc4x/plc4go/spi/utils"
 )
 
+// only used to avoid name collision when no custom name is used.
+var defaultExecutorNameUsage atomic.Uint64
+
 //go:generate go tool plc4xGenerator -type=executor
 type executor struct {
+       name string
+
        running  bool
        shutdown bool
 
        worker       []*worker
+       workerNumber atomic.Uint32
        workItems    chan workItem
        traceWorkers bool
 
@@ -50,13 +57,14 @@ type executor struct {
 
 func newExecutor(queueDepth int, numberOfInitialWorkers int, customLogger 
zerolog.Logger) *executor {
        e := &executor{
+               name:      fmt.Sprintf("executor-%d", 
defaultExecutorNameUsage.Add(1)),
                workItems: make(chan workItem, queueDepth),
                log:       customLogger,
        }
        e.ctx, e.ctxCancel = context.WithCancel(context.Background())
        workers := make([]*worker, numberOfInitialWorkers)
        for i := 0; i < numberOfInitialWorkers; i++ {
-               w := newWorker(customLogger, i, e)
+               w := newWorker(customLogger, fmt.Sprintf("%s-worker-%d", 
e.name, i), e)
                workers[i] = w
        }
        e.worker = workers
@@ -125,7 +133,7 @@ func (e *executor) Start() {
 }
 
 func (e *executor) Stop() {
-       defer utils.StopWarn(e.log)()
+       defer utils.StopWarn(e.log, utils.WithStopWarnProcessId(e.name))()
        e.log.Trace().Msg("stopping now")
        e.stateChange.Lock()
        defer e.stateChange.Unlock()
diff --git a/plc4go/spi/pool/executor_plc4xgen.go 
b/plc4go/spi/pool/executor_plc4xgen.go
index 48aa2406f1..ec8ea6ee06 100644
--- a/plc4go/spi/pool/executor_plc4xgen.go
+++ b/plc4go/spi/pool/executor_plc4xgen.go
@@ -49,6 +49,10 @@ func (d *executor) SerializeWithWriteBuffer(ctx 
context.Context, writeBuffer uti
                return err
        }
 
+       if err := writeBuffer.WriteString("name", uint32(len(d.name)*8), 
d.name); err != nil {
+               return err
+       }
+
        if err := writeBuffer.WriteBit("running", d.running); err != nil {
                return err
        }
@@ -85,6 +89,10 @@ func (d *executor) SerializeWithWriteBuffer(ctx 
context.Context, writeBuffer uti
                return err
        }
 
+       if err := writeBuffer.WriteUint32("workerNumber", 32, 
d.workerNumber.Load()); err != nil {
+               return err
+       }
+
        _workItems_plx4gen_description := fmt.Sprintf("%d element(s)", 
len(d.workItems))
        if err := writeBuffer.WriteString("workItems", 
uint32(len(_workItems_plx4gen_description)*8), _workItems_plx4gen_description); 
err != nil {
                return err
diff --git a/plc4go/spi/pool/executor_test.go b/plc4go/spi/pool/executor_test.go
index b67d6bcd22..b96fd07957 100644
--- a/plc4go/spi/pool/executor_test.go
+++ b/plc4go/spi/pool/executor_test.go
@@ -460,7 +460,7 @@ func Test_executor_String(t *testing.T) {
                                shutdown: true,
                                worker: []*worker{
                                        {
-                                               id:          1,
+                                               id:          "1",
                                                shutdown:    atomic.Bool{},
                                                interrupted: atomic.Bool{},
                                                lastReceived: func() 
atomic.Value {
diff --git a/plc4go/spi/pool/worker.go b/plc4go/spi/pool/worker.go
index 60b95cbe6e..f2e93a03cb 100644
--- a/plc4go/spi/pool/worker.go
+++ b/plc4go/spi/pool/worker.go
@@ -31,7 +31,7 @@ import (
 
 //go:generate go tool plc4xGenerator -type=worker
 type worker struct {
-       id       int
+       id       string
        executor interface {
                isTraceWorkers() bool
                getWorksItems() chan workItem
@@ -50,7 +50,7 @@ type worker struct {
        log zerolog.Logger
 }
 
-func newWorker(localLog zerolog.Logger, workerId int, executor interface {
+func newWorker(localLog zerolog.Logger, workerId string, executor interface {
        isTraceWorkers() bool
        getWorksItems() chan workItem
        getWorkerWaitGroup() *sync.WaitGroup
@@ -59,7 +59,7 @@ func newWorker(localLog zerolog.Logger, workerId int, 
executor interface {
        w := &worker{
                id:       workerId,
                executor: executor,
-               log:      localLog.With().Int("workerId", workerId).Logger(),
+               log:      localLog.With().Str("workerId", workerId).Logger(),
        }
        w.initialize()
        return w
@@ -79,7 +79,7 @@ func (w *worker) start() {
        w.stateChange.Lock()
        defer w.stateChange.Unlock()
        if w.running.Load() {
-               w.log.Warn().Int("Worker id", w.id).Msg("Worker already 
started")
+               w.log.Warn().Msg("Worker already started")
                return
        }
        if w.executor.isTraceWorkers() {
@@ -93,7 +93,7 @@ func (w *worker) stop(interrupt bool) {
        w.stateChange.Lock()
        defer w.stateChange.Unlock()
        if !w.running.Load() {
-               w.log.Warn().Int("Worker id", w.id).Msg("Worker not running")
+               w.log.Warn().Msg("Worker not running")
                return
        }
 
diff --git a/plc4go/spi/pool/worker_plc4xgen.go 
b/plc4go/spi/pool/worker_plc4xgen.go
index 91e4ba903b..bbadc1b8fa 100644
--- a/plc4go/spi/pool/worker_plc4xgen.go
+++ b/plc4go/spi/pool/worker_plc4xgen.go
@@ -49,7 +49,7 @@ func (d *worker) SerializeWithWriteBuffer(ctx 
context.Context, writeBuffer utils
                return err
        }
 
-       if err := writeBuffer.WriteInt64("id", 64, int64(d.id)); err != nil {
+       if err := writeBuffer.WriteString("id", uint32(len(d.id)*8), d.id); err 
!= nil {
                return err
        }
 
diff --git a/plc4go/spi/pool/worker_test.go b/plc4go/spi/pool/worker_test.go
index 1e1a8060ab..a3abd9dfef 100644
--- a/plc4go/spi/pool/worker_test.go
+++ b/plc4go/spi/pool/worker_test.go
@@ -32,7 +32,7 @@ import (
 
 func Test_worker_initialize(t *testing.T) {
        type fields struct {
-               id          int
+               id          string
                interrupter chan struct{}
                executor    interface {
                        isTraceWorkers() bool
@@ -64,7 +64,7 @@ func Test_worker_initialize(t *testing.T) {
 
 func Test_worker_start(t *testing.T) {
        type fields struct {
-               id       int
+               id       string
                executor interface {
                        isTraceWorkers() bool
                        getWorksItems() chan workItem
@@ -144,7 +144,7 @@ func Test_worker_start(t *testing.T) {
 
 func Test_worker_stop(t *testing.T) {
        type fields struct {
-               id       int
+               id       string
                executor interface {
                        isTraceWorkers() bool
                        getWorksItems() chan workItem
@@ -210,7 +210,7 @@ func Test_worker_stop(t *testing.T) {
 
 func Test_worker_work(t *testing.T) {
        type fields struct {
-               id       int
+               id       string
                executor *executor
        }
        tests := []struct {
@@ -226,7 +226,7 @@ func Test_worker_work(t *testing.T) {
                {
                        name: "Worker should work till shutdown (even if it 
panics)",
                        fields: fields{
-                               id: 0,
+                               id: "0",
                                executor: func() *executor {
                                        e := &executor{
                                                workItems:    make(chan 
workItem),
@@ -262,7 +262,7 @@ func Test_worker_work(t *testing.T) {
                {
                        name: "Worker should work till shutdown",
                        fields: fields{
-                               id: 1,
+                               id: "1",
                                executor: func() *executor {
                                        e := &executor{
                                                workItems:    make(chan 
workItem),
@@ -297,7 +297,7 @@ func Test_worker_work(t *testing.T) {
                {
                        name: "Work interrupted",
                        fields: fields{
-                               id: 1,
+                               id: "1",
                                executor: func() *executor {
                                        e := &executor{
                                                workItems:    make(chan 
workItem),
@@ -322,7 +322,7 @@ func Test_worker_work(t *testing.T) {
                {
                        name: "Work on canceled",
                        fields: fields{
-                               id: 1,
+                               id: "1",
                                executor: func() *executor {
                                        e := &executor{
                                                workItems:    make(chan 
workItem),
@@ -386,7 +386,7 @@ func Test_worker_work(t *testing.T) {
 
 func Test_worker_String(t *testing.T) {
        type fields struct {
-               id int
+               id string
        }
        tests := []struct {
                name   string
diff --git a/plc4go/spi/transactions/RequestTransactionManager_test.go 
b/plc4go/spi/transactions/RequestTransactionManager_test.go
index 484894d660..8dc361bd9e 100644
--- a/plc4go/spi/transactions/RequestTransactionManager_test.go
+++ b/plc4go/spi/transactions/RequestTransactionManager_test.go
@@ -664,29 +664,26 @@ func Test_requestTransactionManager_String(t *testing.T) {
                                traceTransactionManagerTransactions: true,
                        },
                        want: `
-╔═requestTransactionManager═══════════════════════════════════════════════════════════════════════════════════════════╗
-║╔═runningRequests/value/requestTransaction╗╔═numberOfConcurrentRequests╗╔═currentTransactionId╗
                      ║
-║║      ╔═transactionId╗╔═completed╗       ║║   0x0000000000000003 3    ║║    
0x00000004 4     ║                      ║
-║║      ║ 0x00000002 2 ║║ b0 false ║       
║╚═══════════════════════════╝╚═════════════════════╝                      ║
-║║      ╚══════════════╝╚══════════╝       ║                                   
                                       ║
-║╚═════════════════════════════════════════╝                                   
                                       ║
-║╔═executor/executor══════════════════════════════════════════════════════════════════════════════════════╗╔═shutdown╗║
-║║╔═running╗╔═shutdown╗                                                        
                           ║║b0 false ║║
-║║║b0 false║║b0 false ║                                                        
                           ║╚═════════╝║
-║║╚════════╝╚═════════╝                                                        
                           ║           ║
-║║╔═worker/value/worker══════════════════════════════════════════════════════════════════════════════════╗║
           ║
-║║║╔═id═════════════════╗╔═lastReceived════════════════╗╔═running╗╔═shutdown╗╔═interrupted╗╔═interrupter╗║║
           ║
-║║║║0x0000000000000000 0║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║ 
 b0 false  ║║0 element(s)║║║           ║
-║║║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║║
           ║
-║║╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝║
           ║
-║║╔═workItems══╗╔═traceWorkers╗                                                
                           ║           ║
-║║║0 element(s)║║  b0 false   ║                                                
                           ║           ║
-║║╚════════════╝╚═════════════╝                                                
                           ║           ║
-║╚════════════════════════════════════════════════════════════════════════════════════════════════════════╝
           ║
-║╔═traceTransactionManagerTransactions╗                                        
                                       ║
-║║              b1 true               ║                                        
                                       ║
-║╚════════════════════════════════════╝                                        
                                       ║
-╚═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
+╔═requestTransactionManager══════════════════════════════════════════════════════════════════════════════════════════╗
+║╔═runningRequests/value/requestTransaction╗╔═numberOfConcurrentRequests╗╔═currentTransactionId╗
                     ║
+║║      ╔═transactionId╗╔═completed╗       ║║   0x0000000000000003 3    ║║    
0x00000004 4     ║                     ║
+║║      ║ 0x00000002 2 ║║ b0 false ║       
║╚═══════════════════════════╝╚═════════════════════╝                     ║
+║║      ╚══════════════╝╚══════════╝       ║                                   
                                      ║
+║╚═════════════════════════════════════════╝                                   
                                      ║
+║╔═executor/executor════════════════════════════════════════════════════════════════════════════════════════════════╗║
+║║╔═running╗╔═shutdown╗╔═worker/value/worker═══════════════════════════════════════════════════════════════════════╗║║
+║║║b0 false║║b0 false 
║║╔═id══════╗╔═lastReceived════════════════╗╔═running╗╔═shutdown╗╔═interrupted╗╔═interrupter╗║║║
+║║╚════════╝╚═════════╝║║-worker-0║║0001-01-01 00:00:00 +0000 UTC║║b0 
false║║b0 false ║║  b0 false  ║║0 element(s)║║║║
+║║                     
║╚═════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║║║
+║║                     
╚═══════════════════════════════════════════════════════════════════════════════════════════╝║║
+║║╔═workerNumber╗╔═workItems══╗╔═traceWorkers╗╔═ctx═════════════════════════╗╔═ctxCancel╗
                           ║║
+║║║0x00000000 0 ║║0 element(s)║║  b0 false   ║║context.Background.WithCancel║║ 
0xc4760  ║                           ║║
+║║╚═════════════╝╚════════════╝╚═════════════╝╚═════════════════════════════╝╚══════════╝
                           ║║
+║╚══════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝║
+║╔═shutdown╗╔═traceTransactionManagerTransactions╗                             
                                      ║
+║║b0 false ║║              b1 true               ║                             
                                      ║
+║╚═════════╝╚════════════════════════════════════╝                             
                                      ║
+╚════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
                },
        }
        for _, tt := range tests {

Reply via email to