This is an automated email from the ASF dual-hosted git repository. sruehl pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit a7bdaeaac4a2f081a90537c6b2000eb9bf7d17f5 Author: Sebastian Rühl <[email protected]> AuthorDate: Mon Nov 10 16:09:31 2025 +0100 feat(plc4go): add names to executors and refine ids. --- plc4go/spi/pool/WorkerPool.go | 4 +- plc4go/spi/pool/WorkerPool_test.go | 10 +++-- plc4go/spi/pool/dynamicExecutor.go | 13 ++++--- plc4go/spi/pool/executor.go | 12 +++++- plc4go/spi/pool/executor_plc4xgen.go | 8 ++++ plc4go/spi/pool/executor_test.go | 2 +- plc4go/spi/pool/worker.go | 10 ++--- plc4go/spi/pool/worker_plc4xgen.go | 2 +- plc4go/spi/pool/worker_test.go | 18 ++++----- .../transactions/RequestTransactionManager_test.go | 43 ++++++++++------------ 10 files changed, 71 insertions(+), 51 deletions(-) diff --git a/plc4go/spi/pool/WorkerPool.go b/plc4go/spi/pool/WorkerPool.go index 8bbc1c0149..0e018d04e7 100644 --- a/plc4go/spi/pool/WorkerPool.go +++ b/plc4go/spi/pool/WorkerPool.go @@ -21,6 +21,7 @@ package pool import ( "context" + "fmt" "io" "time" @@ -54,7 +55,8 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, _options ...options. _executor := newDynamicExecutor(queueDepth, maxNumberOfWorkers, customLogger) _executor.traceWorkers, _ = options.ExtractTracerWorkers(_options...) // We spawn one initial worker - w := newWorker(customLogger, 0, _executor) + workerId := fmt.Sprintf("%s-worker-%d", _executor.name, 0) + w := newWorker(customLogger, workerId, _executor) w.lastReceived.Store(time.Now()) // We store the current timestamp so the worker isn't cut of instantly by the worker killer _executor.worker = append(_executor.worker, w) return _executor diff --git a/plc4go/spi/pool/WorkerPool_test.go b/plc4go/spi/pool/WorkerPool_test.go index 19f4b61d42..343567f599 100644 --- a/plc4go/spi/pool/WorkerPool_test.go +++ b/plc4go/spi/pool/WorkerPool_test.go @@ -143,7 +143,8 @@ func TestNewDynamicExecutor(t *testing.T) { t.Cleanup(wg.Wait) wg.Go(func() { for i := 0; i < 500; i++ { - e.workItems <- workItem{ + select { + case e.workItems <- workItem{ workItemId: int32(i), runnable: func(runnableCtx context.Context) { ctx, cancel := context.WithCancel(t.Context()) @@ -158,7 +159,10 @@ func TestNewDynamicExecutor(t *testing.T) { case <-ctx.Done(): } }, - completionFuture: &future{}, + completionFuture: &future{}}: + t.Logf("Item %d added", i) + case <-t.Context().Done(): + return } } }) @@ -177,7 +181,7 @@ func TestNewDynamicExecutor(t *testing.T) { tt.setup(t, &tt.args) } dynamicSizedExecutor := NewDynamicExecutor(tt.args.numberOfWorkers, tt.args.queueDepth, tt.args.options...) - defer dynamicSizedExecutor.Stop() + t.Cleanup(dynamicSizedExecutor.Stop) if tt.manipulator != nil { tt.manipulator(t, dynamicSizedExecutor.(*dynamicExecutor)) } diff --git a/plc4go/spi/pool/dynamicExecutor.go b/plc4go/spi/pool/dynamicExecutor.go index f3796d6217..6c61f1afa6 100644 --- a/plc4go/spi/pool/dynamicExecutor.go +++ b/plc4go/spi/pool/dynamicExecutor.go @@ -20,6 +20,7 @@ package pool import ( + "fmt" "runtime/debug" "sync" "sync/atomic" @@ -95,11 +96,11 @@ func (e *dynamicExecutor) Start() { Msg("Checking if numberOfItemsInQueue > numberOfWorkers && numberOfWorkers < maxNumberOfWorkers") if numberOfItemsInQueue > numberOfWorkers && numberOfWorkers < e.maxNumberOfWorkers { workerLog.Trace().Msg("spawning new worker") - workerId := numberOfWorkers - 1 + workerId := fmt.Sprintf("%s-worker-%d", e.name, numberOfWorkers-1) _worker := newWorker(e.log, workerId, e) _worker.lastReceived.Store(time.Now()) // We store the current timestamp so the worker isn't cut of instantly by the worker killer e.worker = append(e.worker, _worker) - workerLog.Info().Int("Worker id", _worker.id).Msg("spawning") + workerLog.Info().Str("Worker id", _worker.id).Msg("spawning") _worker.start() e.currentNumberOfWorkers.Add(1) } else { @@ -140,16 +141,16 @@ func (e *dynamicExecutor) Start() { for _, _worker := range e.worker { deadline := time.Now().Add(-timeToBecomeUnused) workerLog.Debug(). - Int("workerId", _worker.id). + Str("workerId", _worker.id). Time("lastReceived", _worker.lastReceived.Load().(time.Time)). Time("deadline", deadline). Msg("Checking if lastReceived is before deadline") if _worker.lastReceived.Load().(time.Time).Before(deadline) { - workerLog.Info().Int("Worker id", _worker.id).Msg("killing") + workerLog.Info().Str("Worker id", _worker.id).Msg("killing") _worker.stop(true) e.currentNumberOfWorkers.Add(-1) } else { - workerLog.Debug().Int("Worker id", _worker.id).Msg("still ok") + workerLog.Debug().Str("Worker id", _worker.id).Msg("still ok") newWorkers = append(newWorkers, _worker) workersChanged = true } @@ -175,7 +176,7 @@ func (e *dynamicExecutor) Start() { } func (e *dynamicExecutor) Stop() { - defer utils.StopWarn(e.log)() + defer utils.StopWarn(e.log, utils.WithStopWarnProcessId(e.name))() e.log.Trace().Msg("stopping now") e.dynamicStateChange.Lock() defer e.dynamicStateChange.Unlock() diff --git a/plc4go/spi/pool/executor.go b/plc4go/spi/pool/executor.go index a63e6e8bc3..ae5f750b94 100644 --- a/plc4go/spi/pool/executor.go +++ b/plc4go/spi/pool/executor.go @@ -21,6 +21,7 @@ package pool import ( "context" + "fmt" "sync" "sync/atomic" @@ -30,12 +31,18 @@ import ( "github.com/apache/plc4x/plc4go/spi/utils" ) +// only used to avoid name collision when no custom name is used. +var defaultExecutorNameUsage atomic.Uint64 + //go:generate go tool plc4xGenerator -type=executor type executor struct { + name string + running bool shutdown bool worker []*worker + workerNumber atomic.Uint32 workItems chan workItem traceWorkers bool @@ -50,13 +57,14 @@ type executor struct { func newExecutor(queueDepth int, numberOfInitialWorkers int, customLogger zerolog.Logger) *executor { e := &executor{ + name: fmt.Sprintf("executor-%d", defaultExecutorNameUsage.Add(1)), workItems: make(chan workItem, queueDepth), log: customLogger, } e.ctx, e.ctxCancel = context.WithCancel(context.Background()) workers := make([]*worker, numberOfInitialWorkers) for i := 0; i < numberOfInitialWorkers; i++ { - w := newWorker(customLogger, i, e) + w := newWorker(customLogger, fmt.Sprintf("%s-worker-%d", e.name, i), e) workers[i] = w } e.worker = workers @@ -125,7 +133,7 @@ func (e *executor) Start() { } func (e *executor) Stop() { - defer utils.StopWarn(e.log)() + defer utils.StopWarn(e.log, utils.WithStopWarnProcessId(e.name))() e.log.Trace().Msg("stopping now") e.stateChange.Lock() defer e.stateChange.Unlock() diff --git a/plc4go/spi/pool/executor_plc4xgen.go b/plc4go/spi/pool/executor_plc4xgen.go index 48aa2406f1..ec8ea6ee06 100644 --- a/plc4go/spi/pool/executor_plc4xgen.go +++ b/plc4go/spi/pool/executor_plc4xgen.go @@ -49,6 +49,10 @@ func (d *executor) SerializeWithWriteBuffer(ctx context.Context, writeBuffer uti return err } + if err := writeBuffer.WriteString("name", uint32(len(d.name)*8), d.name); err != nil { + return err + } + if err := writeBuffer.WriteBit("running", d.running); err != nil { return err } @@ -85,6 +89,10 @@ func (d *executor) SerializeWithWriteBuffer(ctx context.Context, writeBuffer uti return err } + if err := writeBuffer.WriteUint32("workerNumber", 32, d.workerNumber.Load()); err != nil { + return err + } + _workItems_plx4gen_description := fmt.Sprintf("%d element(s)", len(d.workItems)) if err := writeBuffer.WriteString("workItems", uint32(len(_workItems_plx4gen_description)*8), _workItems_plx4gen_description); err != nil { return err diff --git a/plc4go/spi/pool/executor_test.go b/plc4go/spi/pool/executor_test.go index b67d6bcd22..b96fd07957 100644 --- a/plc4go/spi/pool/executor_test.go +++ b/plc4go/spi/pool/executor_test.go @@ -460,7 +460,7 @@ func Test_executor_String(t *testing.T) { shutdown: true, worker: []*worker{ { - id: 1, + id: "1", shutdown: atomic.Bool{}, interrupted: atomic.Bool{}, lastReceived: func() atomic.Value { diff --git a/plc4go/spi/pool/worker.go b/plc4go/spi/pool/worker.go index 60b95cbe6e..f2e93a03cb 100644 --- a/plc4go/spi/pool/worker.go +++ b/plc4go/spi/pool/worker.go @@ -31,7 +31,7 @@ import ( //go:generate go tool plc4xGenerator -type=worker type worker struct { - id int + id string executor interface { isTraceWorkers() bool getWorksItems() chan workItem @@ -50,7 +50,7 @@ type worker struct { log zerolog.Logger } -func newWorker(localLog zerolog.Logger, workerId int, executor interface { +func newWorker(localLog zerolog.Logger, workerId string, executor interface { isTraceWorkers() bool getWorksItems() chan workItem getWorkerWaitGroup() *sync.WaitGroup @@ -59,7 +59,7 @@ func newWorker(localLog zerolog.Logger, workerId int, executor interface { w := &worker{ id: workerId, executor: executor, - log: localLog.With().Int("workerId", workerId).Logger(), + log: localLog.With().Str("workerId", workerId).Logger(), } w.initialize() return w @@ -79,7 +79,7 @@ func (w *worker) start() { w.stateChange.Lock() defer w.stateChange.Unlock() if w.running.Load() { - w.log.Warn().Int("Worker id", w.id).Msg("Worker already started") + w.log.Warn().Msg("Worker already started") return } if w.executor.isTraceWorkers() { @@ -93,7 +93,7 @@ func (w *worker) stop(interrupt bool) { w.stateChange.Lock() defer w.stateChange.Unlock() if !w.running.Load() { - w.log.Warn().Int("Worker id", w.id).Msg("Worker not running") + w.log.Warn().Msg("Worker not running") return } diff --git a/plc4go/spi/pool/worker_plc4xgen.go b/plc4go/spi/pool/worker_plc4xgen.go index 91e4ba903b..bbadc1b8fa 100644 --- a/plc4go/spi/pool/worker_plc4xgen.go +++ b/plc4go/spi/pool/worker_plc4xgen.go @@ -49,7 +49,7 @@ func (d *worker) SerializeWithWriteBuffer(ctx context.Context, writeBuffer utils return err } - if err := writeBuffer.WriteInt64("id", 64, int64(d.id)); err != nil { + if err := writeBuffer.WriteString("id", uint32(len(d.id)*8), d.id); err != nil { return err } diff --git a/plc4go/spi/pool/worker_test.go b/plc4go/spi/pool/worker_test.go index 1e1a8060ab..a3abd9dfef 100644 --- a/plc4go/spi/pool/worker_test.go +++ b/plc4go/spi/pool/worker_test.go @@ -32,7 +32,7 @@ import ( func Test_worker_initialize(t *testing.T) { type fields struct { - id int + id string interrupter chan struct{} executor interface { isTraceWorkers() bool @@ -64,7 +64,7 @@ func Test_worker_initialize(t *testing.T) { func Test_worker_start(t *testing.T) { type fields struct { - id int + id string executor interface { isTraceWorkers() bool getWorksItems() chan workItem @@ -144,7 +144,7 @@ func Test_worker_start(t *testing.T) { func Test_worker_stop(t *testing.T) { type fields struct { - id int + id string executor interface { isTraceWorkers() bool getWorksItems() chan workItem @@ -210,7 +210,7 @@ func Test_worker_stop(t *testing.T) { func Test_worker_work(t *testing.T) { type fields struct { - id int + id string executor *executor } tests := []struct { @@ -226,7 +226,7 @@ func Test_worker_work(t *testing.T) { { name: "Worker should work till shutdown (even if it panics)", fields: fields{ - id: 0, + id: "0", executor: func() *executor { e := &executor{ workItems: make(chan workItem), @@ -262,7 +262,7 @@ func Test_worker_work(t *testing.T) { { name: "Worker should work till shutdown", fields: fields{ - id: 1, + id: "1", executor: func() *executor { e := &executor{ workItems: make(chan workItem), @@ -297,7 +297,7 @@ func Test_worker_work(t *testing.T) { { name: "Work interrupted", fields: fields{ - id: 1, + id: "1", executor: func() *executor { e := &executor{ workItems: make(chan workItem), @@ -322,7 +322,7 @@ func Test_worker_work(t *testing.T) { { name: "Work on canceled", fields: fields{ - id: 1, + id: "1", executor: func() *executor { e := &executor{ workItems: make(chan workItem), @@ -386,7 +386,7 @@ func Test_worker_work(t *testing.T) { func Test_worker_String(t *testing.T) { type fields struct { - id int + id string } tests := []struct { name string diff --git a/plc4go/spi/transactions/RequestTransactionManager_test.go b/plc4go/spi/transactions/RequestTransactionManager_test.go index 484894d660..8dc361bd9e 100644 --- a/plc4go/spi/transactions/RequestTransactionManager_test.go +++ b/plc4go/spi/transactions/RequestTransactionManager_test.go @@ -664,29 +664,26 @@ func Test_requestTransactionManager_String(t *testing.T) { traceTransactionManagerTransactions: true, }, want: ` -╔═requestTransactionManager═══════════════════════════════════════════════════════════════════════════════════════════╗ -║╔═runningRequests/value/requestTransaction╗╔═numberOfConcurrentRequests╗╔═currentTransactionId╗ ║ -║║ ╔═transactionId╗╔═completed╗ ║║ 0x0000000000000003 3 ║║ 0x00000004 4 ║ ║ -║║ ║ 0x00000002 2 ║║ b0 false ║ ║╚═══════════════════════════╝╚═════════════════════╝ ║ -║║ ╚══════════════╝╚══════════╝ ║ ║ -║╚═════════════════════════════════════════╝ ║ -║╔═executor/executor══════════════════════════════════════════════════════════════════════════════════════╗╔═shutdown╗║ -║║╔═running╗╔═shutdown╗ ║║b0 false ║║ -║║║b0 false║║b0 false ║ ║╚═════════╝║ -║║╚════════╝╚═════════╝ ║ ║ -║║╔═worker/value/worker══════════════════════════════════════════════════════════════════════════════════╗║ ║ -║║║╔═id═════════════════╗╔═lastReceived════════════════╗╔═running╗╔═shutdown╗╔═interrupted╗╔═interrupter╗║║ ║ -║║║║0x0000000000000000 0║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║ b0 false ║║0 element(s)║║║ ║ -║║║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║║ ║ -║║╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝║ ║ -║║╔═workItems══╗╔═traceWorkers╗ ║ ║ -║║║0 element(s)║║ b0 false ║ ║ ║ -║║╚════════════╝╚═════════════╝ ║ ║ -║╚════════════════════════════════════════════════════════════════════════════════════════════════════════╝ ║ -║╔═traceTransactionManagerTransactions╗ ║ -║║ b1 true ║ ║ -║╚════════════════════════════════════╝ ║ -╚═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:], +╔═requestTransactionManager══════════════════════════════════════════════════════════════════════════════════════════╗ +║╔═runningRequests/value/requestTransaction╗╔═numberOfConcurrentRequests╗╔═currentTransactionId╗ ║ +║║ ╔═transactionId╗╔═completed╗ ║║ 0x0000000000000003 3 ║║ 0x00000004 4 ║ ║ +║║ ║ 0x00000002 2 ║║ b0 false ║ ║╚═══════════════════════════╝╚═════════════════════╝ ║ +║║ ╚══════════════╝╚══════════╝ ║ ║ +║╚═════════════════════════════════════════╝ ║ +║╔═executor/executor════════════════════════════════════════════════════════════════════════════════════════════════╗║ +║║╔═running╗╔═shutdown╗╔═worker/value/worker═══════════════════════════════════════════════════════════════════════╗║║ +║║║b0 false║║b0 false ║║╔═id══════╗╔═lastReceived════════════════╗╔═running╗╔═shutdown╗╔═interrupted╗╔═interrupter╗║║║ +║║╚════════╝╚═════════╝║║-worker-0║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║ b0 false ║║0 element(s)║║║║ +║║ ║╚═════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║║║ +║║ ╚═══════════════════════════════════════════════════════════════════════════════════════════╝║║ +║║╔═workerNumber╗╔═workItems══╗╔═traceWorkers╗╔═ctx═════════════════════════╗╔═ctxCancel╗ ║║ +║║║0x00000000 0 ║║0 element(s)║║ b0 false ║║context.Background.WithCancel║║ 0xc4760 ║ ║║ +║║╚═════════════╝╚════════════╝╚═════════════╝╚═════════════════════════════╝╚══════════╝ ║║ +║╚══════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝║ +║╔═shutdown╗╔═traceTransactionManagerTransactions╗ ║ +║║b0 false ║║ b1 true ║ ║ +║╚═════════╝╚════════════════════════════════════╝ ║ +╚════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:], }, } for _, tt := range tests {
