This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 878c4084e8 fix: unified wait group/go func usage
878c4084e8 is described below

commit 878c4084e8ccfe62ef7895432eca5a3694912d13
Author: Sebastian Rühl <[email protected]>
AuthorDate: Fri Nov 7 13:17:11 2025 +0100

    fix: unified wait group/go func usage
    
    feat: added ctx to executor
---
 plc4go/internal/ads/Browser.go                     |  6 +--
 plc4go/internal/ads/Connection.go                  | 12 ++---
 plc4go/internal/ads/Discoverer.go                  |  6 +--
 plc4go/internal/ads/Interactions.go                | 36 +++++--------
 plc4go/internal/ads/Reader.go                      | 12 ++---
 plc4go/internal/ads/Subscriber.go                  | 12 ++---
 plc4go/internal/ads/Writer.go                      | 12 ++---
 .../bacnetip/ApplicationLayerMessageCodec.go       | 24 +++------
 plc4go/internal/bacnetip/Connection.go             | 12 ++---
 plc4go/internal/bacnetip/Discoverer.go             | 20 +++-----
 plc4go/internal/bacnetip/Reader.go                 | 10 ++--
 plc4go/internal/bacnetip/Subscriber.go             |  6 +--
 plc4go/internal/bacnetip/bacgopes/core/core.go     | 11 ++--
 .../internal/bacnetip/bacgopes/iocb/iocb_IOCB.go   |  6 +--
 .../bacnetip/bacgopes/iocb/iocb_IOQueue.go         |  6 +--
 .../bacnetip/bacgopes/udp/udp_UDPDirector.go       | 18 +++----
 plc4go/internal/cbus/Browser.go                    |  6 +--
 plc4go/internal/cbus/CBusMessageMapper_test.go     | 45 +++++++++--------
 plc4go/internal/cbus/Connection.go                 | 28 +++-------
 plc4go/internal/cbus/Connection_test.go            | 12 ++---
 plc4go/internal/cbus/Reader.go                     |  4 +-
 plc4go/internal/cbus/Subscriber.go                 |  6 +--
 plc4go/internal/cbus/Writer.go                     | 10 ++--
 plc4go/internal/eip/Connection.go                  | 12 ++---
 plc4go/internal/eip/Reader.go                      | 10 ++--
 plc4go/internal/eip/Writer.go                      |  6 +--
 plc4go/internal/knxnetip/Connection.go             | 24 +++------
 .../knxnetip/ConnectionDriverSpecificOperations.go | 42 +++++----------
 plc4go/internal/knxnetip/ConnectionHelper.go       | 10 ++--
 plc4go/internal/knxnetip/Discoverer.go             | 26 +++++-----
 plc4go/internal/knxnetip/Reader.go                 |  6 +--
 plc4go/internal/knxnetip/Subscriber.go             |  6 +--
 plc4go/internal/modbus/AsciiDriver.go              |  6 +--
 plc4go/internal/modbus/Connection.go               |  6 +--
 plc4go/internal/modbus/Reader.go                   |  6 +--
 plc4go/internal/modbus/RtuDriver.go                |  6 +--
 plc4go/internal/modbus/TcpDriver.go                |  6 +--
 plc4go/internal/modbus/Writer.go                   |  6 +--
 plc4go/internal/opcua/Connection.go                | 12 ++---
 plc4go/internal/opcua/SecureChannel.go             |  8 +--
 plc4go/internal/opcua/SubscriptionHandle.go        |  8 +--
 plc4go/internal/s7/Connection.go                   |  6 +--
 plc4go/internal/s7/Reader.go                       | 10 ++--
 plc4go/internal/s7/Writer.go                       | 10 ++--
 plc4go/internal/simulated/Connection.go            | 18 +++----
 plc4go/internal/simulated/Connection_test.go       |  7 ++-
 plc4go/internal/simulated/Reader.go                |  6 +--
 plc4go/internal/simulated/Writer.go                |  6 +--
 plc4go/pkg/api/cache/PlcConnectionCache.go         | 44 +++++++---------
 plc4go/pkg/api/cache/PlcConnectionCache_test.go    | 23 ++++-----
 plc4go/pkg/api/cache/connectionContainer.go        | 12 ++---
 plc4go/pkg/api/cache/plcConnectionLease.go         |  6 +--
 plc4go/spi/default/DefaultBrowser.go               |  6 +--
 plc4go/spi/default/DefaultCodec.go                 | 22 ++++----
 plc4go/spi/default/DefaultCodec_test.go            |  7 ++-
 plc4go/spi/default/DefaultConnection.go            | 12 ++---
 plc4go/spi/model/DefaultPlcReadRequest.go          |  6 +--
 .../spi/model/DefaultPlcUnsubscriptionRequest.go   |  6 +--
 plc4go/spi/model/DefaultPlcWriteRequest.go         |  6 +--
 plc4go/spi/pool/WorkerPool.go                      |  2 +-
 plc4go/spi/pool/WorkerPool_test.go                 | 18 +++++--
 plc4go/spi/pool/dynamicExecutor.go                 | 22 +++-----
 plc4go/spi/pool/dynamicExecutor_test.go            | 11 ++--
 plc4go/spi/pool/executor.go                        | 10 ++++
 plc4go/spi/pool/executor_plc4xgen.go               | 38 ++++++++++++++
 plc4go/spi/pool/executor_test.go                   | 16 ++++--
 plc4go/spi/pool/worker.go                          | 12 +++--
 plc4go/spi/pool/worker_test.go                     | 40 ++++++++++-----
 plc4go/spi/transactions/RequestTransaction.go      |  4 +-
 .../spi/transactions/RequestTransactionManager.go  | 15 ++++--
 .../transactions/RequestTransactionManager_test.go | 59 +++++++++++++++-------
 plc4go/spi/transactions/RequestTransaction_test.go | 25 +++++----
 plc4go/spi/transports/pcap/TransportInstance.go    |  7 ++-
 .../spi/transports/tcp/TransportInstance_test.go   | 19 ++++---
 plc4go/spi/transports/udp/TransportInstance.go     |  4 +-
 .../utils/DefaultBufferedTransportInstance.go      |  6 +--
 plc4go/spi/utils/StopWarn.go                       |  6 +--
 77 files changed, 492 insertions(+), 561 deletions(-)

diff --git a/plc4go/internal/ads/Browser.go b/plc4go/internal/ads/Browser.go
index 9044702b1c..a301e28c52 100644
--- a/plc4go/internal/ads/Browser.go
+++ b/plc4go/internal/ads/Browser.go
@@ -44,9 +44,7 @@ func (m *Connection) Browse(ctx context.Context, 
browseRequest apiModel.PlcBrows
 
 func (m *Connection) BrowseWithInterceptor(ctx context.Context, browseRequest 
apiModel.PlcBrowseRequest, interceptor func(result apiModel.PlcBrowseItem) 
bool) <-chan apiModel.PlcBrowseRequestResult {
        result := make(chan apiModel.PlcBrowseRequestResult, 1)
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                result <- 
spiModel.NewDefaultPlcBrowseRequestResult(browseRequest, nil, 
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -60,7 +58,7 @@ func (m *Connection) BrowseWithInterceptor(ctx 
context.Context, browseRequest ap
                }
                browseResponse := 
spiModel.NewDefaultPlcBrowseResponse(browseRequest, results, responseCodes)
                result <- 
spiModel.NewDefaultPlcBrowseRequestResult(browseRequest, browseResponse, nil)
-       }()
+       })
        return result
 }
 
diff --git a/plc4go/internal/ads/Connection.go 
b/plc4go/internal/ads/Connection.go
index eaf4ed3ff1..23dfae6149 100644
--- a/plc4go/internal/ads/Connection.go
+++ b/plc4go/internal/ads/Connection.go
@@ -119,9 +119,7 @@ func (m *Connection) ConnectWithContext(ctx 
context.Context) <-chan plc4go.PlcCo
 
        // Reset the driver context (Actually this should not be required, but 
just to be on the safe side)
        m.driverContext.clear()
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                ch <- 
_default.NewDefaultPlcConnectionCloseResult(nil, errors.Errorf("panic-ed %v. 
Stack: %s", err, debug.Stack()))
@@ -133,7 +131,7 @@ func (m *Connection) ConnectWithContext(ctx 
context.Context) <-chan plc4go.PlcCo
                }
 
                m.setupConnection(ctx, ch)
-       }()
+       })
        return ch
 }
 
@@ -178,9 +176,7 @@ func (m *Connection) setupConnection(ctx context.Context, 
ch chan plc4go.PlcConn
        // Start the worker for handling incoming messages
        // (Messages that are not responses to outgoing messages)
        defaultIncomingMessageChannel := 
m.messageCodec.GetDefaultIncomingMessageChannel()
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                m.log.Error().
@@ -206,7 +202,7 @@ func (m *Connection) setupConnection(ctx context.Context, 
ch chan plc4go.PlcConn
                        }
                }
                m.log.Info().Msg("Done waiting for messages ...")
-       }()
+       })
 
        // Subscribe for changes to the symbol or the offline-versions
        versionChangeRequest, err := m.SubscriptionRequestBuilder().
diff --git a/plc4go/internal/ads/Discoverer.go 
b/plc4go/internal/ads/Discoverer.go
index bd525dbaff..b8c33c3838 100644
--- a/plc4go/internal/ads/Discoverer.go
+++ b/plc4go/internal/ads/Discoverer.go
@@ -167,9 +167,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback 
func(event apiModel.
                discoveryItem.socket = socket
 
                // Start a worker to receive responses
-               d.wg.Add(1)
-               go func(discoveryItem *discovery) {
-                       defer d.wg.Done()
+               d.wg.Go(func() {
                        defer func() {
                                if err := recover(); err != nil {
                                        d.log.Error().
@@ -271,7 +269,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback 
func(event apiModel.
                                        callback(plcDiscoveryItem)
                                }
                        }
-               }(discoveryItem)
+               })
        }
        defer func() {
                for _, discoveryItem := range discoveryItems {
diff --git a/plc4go/internal/ads/Interactions.go 
b/plc4go/internal/ads/Interactions.go
index 25dee5805c..d578ee21bb 100644
--- a/plc4go/internal/ads/Interactions.go
+++ b/plc4go/internal/ads/Interactions.go
@@ -32,9 +32,7 @@ import (
 
 func (m *Connection) ExecuteAdsReadDeviceInfoRequest(ctx context.Context) 
(model.AdsReadDeviceInfoResponse, error) {
        responseChannel := make(chan model.AdsReadDeviceInfoResponse, 1)
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                m.log.Error().
@@ -67,7 +65,7 @@ func (m *Connection) ExecuteAdsReadDeviceInfoRequest(ctx 
context.Context) (model
                        m.log.Debug().Err(err).Msg("error during send request")
                        close(responseChannel)
                }
-       }()
+       })
        response, err := ReadWithTimeout(ctx, responseChannel)
        if err != nil {
                return nil, fmt.Errorf("error reading device info: %v", err)
@@ -77,9 +75,7 @@ func (m *Connection) ExecuteAdsReadDeviceInfoRequest(ctx 
context.Context) (model
 
 func (m *Connection) ExecuteAdsReadRequest(ctx context.Context, indexGroup 
uint32, indexOffset uint32, length uint32) (model.AdsReadResponse, error) {
        responseChannel := make(chan model.AdsReadResponse, 1)
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                m.log.Error().
@@ -112,7 +108,7 @@ func (m *Connection) ExecuteAdsReadRequest(ctx 
context.Context, indexGroup uint3
                        m.log.Debug().Err(err).Msg("error during send request")
                        close(responseChannel)
                }
-       }()
+       })
        response, err := ReadWithTimeout(ctx, responseChannel)
        if err != nil {
                return nil, fmt.Errorf("error reading: %v", err)
@@ -122,9 +118,7 @@ func (m *Connection) ExecuteAdsReadRequest(ctx 
context.Context, indexGroup uint3
 
 func (m *Connection) ExecuteAdsWriteRequest(ctx context.Context, indexGroup 
uint32, indexOffset uint32, data []byte) (model.AdsWriteResponse, error) {
        responseChannel := make(chan model.AdsWriteResponse, 1)
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                m.log.Error().
@@ -157,7 +151,7 @@ func (m *Connection) ExecuteAdsWriteRequest(ctx 
context.Context, indexGroup uint
                        m.log.Debug().Err(err).Msg("error during send request")
                        close(responseChannel)
                }
-       }()
+       })
        response, err := ReadWithTimeout(ctx, responseChannel)
        if err != nil {
                return nil, fmt.Errorf("error writing: %v", err)
@@ -167,9 +161,7 @@ func (m *Connection) ExecuteAdsWriteRequest(ctx 
context.Context, indexGroup uint
 
 func (m *Connection) ExecuteAdsReadWriteRequest(ctx context.Context, 
indexGroup uint32, indexOffset uint32, readLength uint32, items 
[]model.AdsMultiRequestItem, writeData []byte) (model.AdsReadWriteResponse, 
error) {
        responseChannel := make(chan model.AdsReadWriteResponse, 1)
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                m.log.Error().
@@ -202,7 +194,7 @@ func (m *Connection) ExecuteAdsReadWriteRequest(ctx 
context.Context, indexGroup
                        m.log.Debug().Err(err).Msg("error during send request")
                        close(responseChannel)
                }
-       }()
+       })
        response, err := ReadWithTimeout(ctx, responseChannel)
        if err != nil {
                return nil, fmt.Errorf("error writing: %v", err)
@@ -212,9 +204,7 @@ func (m *Connection) ExecuteAdsReadWriteRequest(ctx 
context.Context, indexGroup
 
 func (m *Connection) ExecuteAdsAddDeviceNotificationRequest(ctx 
context.Context, indexGroup uint32, indexOffset uint32, length uint32, 
transmissionMode model.AdsTransMode, maxDelay uint32, cycleTime uint32) 
(model.AdsAddDeviceNotificationResponse, error) {
        responseChannel := make(chan model.AdsAddDeviceNotificationResponse, 1)
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                m.log.Error().
@@ -247,7 +237,7 @@ func (m *Connection) 
ExecuteAdsAddDeviceNotificationRequest(ctx context.Context,
                        m.log.Debug().Err(err).Msg("error during send request")
                        close(responseChannel)
                }
-       }()
+       })
        response, err := ReadWithTimeout(ctx, responseChannel)
        if err != nil {
                return nil, fmt.Errorf("error writing: %v", err)
@@ -257,9 +247,7 @@ func (m *Connection) 
ExecuteAdsAddDeviceNotificationRequest(ctx context.Context,
 
 func (m *Connection) ExecuteAdsDeleteDeviceNotificationRequest(ctx 
context.Context, notificationHandle uint32) 
(model.AdsDeleteDeviceNotificationResponse, error) {
        responseChannel := make(chan model.AdsDeleteDeviceNotificationResponse, 
1)
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                m.log.Error().
@@ -292,7 +280,7 @@ func (m *Connection) 
ExecuteAdsDeleteDeviceNotificationRequest(ctx context.Conte
                        m.log.Debug().Err(err).Msg("error during send request")
                        close(responseChannel)
                }
-       }()
+       })
        response, err := ReadWithTimeout(ctx, responseChannel)
        if err != nil {
                return nil, fmt.Errorf("error writing: %v", err)
diff --git a/plc4go/internal/ads/Reader.go b/plc4go/internal/ads/Reader.go
index 97e0971714..2888c185bb 100644
--- a/plc4go/internal/ads/Reader.go
+++ b/plc4go/internal/ads/Reader.go
@@ -44,9 +44,7 @@ func (m *Connection) ReadRequestBuilder() 
apiModel.PlcReadRequestBuilder {
 func (m *Connection) Read(ctx context.Context, readRequest 
apiModel.PlcReadRequest) <-chan apiModel.PlcReadRequestResult {
        m.log.Trace().Msg("Reading")
        result := make(chan apiModel.PlcReadRequestResult, 1)
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                result <- 
spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, 
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -57,7 +55,7 @@ func (m *Connection) Read(ctx context.Context, readRequest 
apiModel.PlcReadReque
                } else {
                        m.multiRead(ctx, readRequest, result)
                }
-       }()
+       })
        return result
 }
 
@@ -97,9 +95,7 @@ func (m *Connection) singleRead(ctx context.Context, 
readRequest apiModel.PlcRea
                return
        }
 
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                result <- 
spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, 
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -140,7 +136,7 @@ func (m *Connection) singleRead(ctx context.Context, 
readRequest apiModel.PlcRea
                        spiModel.NewDefaultPlcReadResponse(readRequest, 
responseCodes, plcValues),
                        nil,
                )
-       }()
+       })
 }
 
 func (m *Connection) multiRead(ctx context.Context, readRequest 
apiModel.PlcReadRequest, result chan apiModel.PlcReadRequestResult) {
diff --git a/plc4go/internal/ads/Subscriber.go 
b/plc4go/internal/ads/Subscriber.go
index 2a71df4873..8ab5f0ffae 100644
--- a/plc4go/internal/ads/Subscriber.go
+++ b/plc4go/internal/ads/Subscriber.go
@@ -101,9 +101,7 @@ func (m *Connection) Subscribe(ctx context.Context, 
subscriptionRequest apiModel
 
        // Create a new result-channel, which completes as soon as all 
sub-result-channels have returned
        globalResultChannel := make(chan apiModel.PlcSubscriptionRequestResult, 
1)
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                m.log.Error().
@@ -128,16 +126,14 @@ func (m *Connection) Subscribe(ctx context.Context, 
subscriptionRequest apiModel
                result := m.processSubscriptionResponses(ctx, 
subscriptionRequest, subResults)
                // Return the final result
                globalResultChannel <- result
-       }()
+       })
 
        return globalResultChannel
 }
 
 func (m *Connection) subscribe(ctx context.Context, subscriptionRequest 
apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
        responseChan := make(chan apiModel.PlcSubscriptionRequestResult, 1)
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                responseChan <- 
spiModel.NewDefaultPlcSubscriptionRequestResult(subscriptionRequest, nil, 
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -177,7 +173,7 @@ func (m *Connection) subscribe(ctx context.Context, 
subscriptionRequest apiModel
                )
                // Store it together with the returned ADS handle.
                m.subscriptions[response.GetNotificationHandle()] = 
subscriptionHandle
-       }()
+       })
        return responseChan
 }
 
diff --git a/plc4go/internal/ads/Writer.go b/plc4go/internal/ads/Writer.go
index 35e6cfc778..025c8d9ce6 100644
--- a/plc4go/internal/ads/Writer.go
+++ b/plc4go/internal/ads/Writer.go
@@ -43,9 +43,7 @@ func (m *Connection) WriteRequestBuilder() 
apiModel.PlcWriteRequestBuilder {
 func (m *Connection) Write(ctx context.Context, writeRequest 
apiModel.PlcWriteRequest) <-chan apiModel.PlcWriteRequestResult {
        m.log.Trace().Msg("Writing")
        result := make(chan apiModel.PlcWriteRequestResult, 1)
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                result <- 
spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, 
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -56,7 +54,7 @@ func (m *Connection) Write(ctx context.Context, writeRequest 
apiModel.PlcWriteRe
                } else {
                        m.multiWrite(ctx, writeRequest, result)
                }
-       }()
+       })
        return result
 }
 
@@ -106,9 +104,7 @@ func (m *Connection) singleWrite(ctx context.Context, 
writeRequest apiModel.PlcW
        }
        data := io.GetBytes()
 
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                result <- 
spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, 
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -133,7 +129,7 @@ func (m *Connection) singleWrite(ctx context.Context, 
writeRequest apiModel.PlcW
                }
                // Return the response to the caller.
                result <- 
spiModel.NewDefaultPlcWriteRequestResult(writeRequest, 
spiModel.NewDefaultPlcWriteResponse(writeRequest, responseCodes), nil)
-       }()
+       })
 }
 
 func (m *Connection) multiWrite(ctx context.Context, writeRequest 
apiModel.PlcWriteRequest, result chan apiModel.PlcWriteRequestResult) {
diff --git a/plc4go/internal/bacnetip/ApplicationLayerMessageCodec.go 
b/plc4go/internal/bacnetip/ApplicationLayerMessageCodec.go
index a96285b819..9ca3f3375c 100644
--- a/plc4go/internal/bacnetip/ApplicationLayerMessageCodec.go
+++ b/plc4go/internal/bacnetip/ApplicationLayerMessageCodec.go
@@ -128,16 +128,12 @@ func (m *ApplicationLayerMessageCodec) Send(message 
spi.Message) error {
        if err != nil {
                return errors.Wrap(err, "error creating IOCB")
        }
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
-               m.wg.Add(1)
-               go func() {
-                       defer m.wg.Done()
+       m.wg.Go(func() {
+               m.wg.Go(func() {
                        if err := m.bipSimpleApplication.RequestIO(iocb); err 
!= nil {
                                m.log.Debug().Err(err).Msg("errored")
                        }
-               }()
+               })
                iocb.Wait()
                if err := iocb.GetIOError(); err != nil {
                        // TODO: handle error
@@ -148,7 +144,7 @@ func (m *ApplicationLayerMessageCodec) Send(message 
spi.Message) error {
                } else {
                        // TODO: what now?
                }
-       }()
+       })
        return nil
 }
 
@@ -166,16 +162,12 @@ func (m *ApplicationLayerMessageCodec) SendRequest(ctx 
context.Context, message
        if err != nil {
                return errors.Wrap(err, "error creating IOCB")
        }
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
-               m.wg.Add(1)
-               go func() {
-                       defer m.wg.Done()
+       m.wg.Go(func() {
+               m.wg.Go(func() {
                        if err := m.bipSimpleApplication.RequestIO(iocb); err 
!= nil {
                                m.log.Error().Err(err).Msg("errored")
                        }
-               }()
+               })
                iocb.Wait()
                if err := iocb.GetIOError(); err != nil {
                        if err := handleError(err); err != nil {
@@ -216,7 +208,7 @@ func (m *ApplicationLayerMessageCodec) SendRequest(ctx 
context.Context, message
                } else {
                        // TODO: what now?
                }
-       }()
+       })
        return nil
 }
 
diff --git a/plc4go/internal/bacnetip/Connection.go 
b/plc4go/internal/bacnetip/Connection.go
index 26db0bfeae..6e5af988de 100644
--- a/plc4go/internal/bacnetip/Connection.go
+++ b/plc4go/internal/bacnetip/Connection.go
@@ -91,18 +91,14 @@ func (c *Connection) GetTracer() tracer.Tracer {
 func (c *Connection) ConnectWithContext(ctx context.Context) <-chan 
plc4go.PlcConnectionConnectResult {
        c.log.Trace().Msg("Connecting")
        ch := make(chan plc4go.PlcConnectionConnectResult, 1)
-       c.wg.Add(1)
-       go func() {
-               defer c.wg.Done()
+       c.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                ch <- 
_default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v. 
Stack: %s", err, debug.Stack()))
                        }
                }()
                connectionConnectResult := 
<-c.DefaultConnection.ConnectWithContext(ctx)
-               c.wg.Add(1)
-               go func() {
-                       defer c.wg.Done()
+               c.wg.Go(func() {
                        defer func() {
                                if err := recover(); err != nil {
                                        ch <- 
_default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v. 
Stack: %s", err, debug.Stack()))
@@ -113,9 +109,9 @@ func (c *Connection) ConnectWithContext(ctx 
context.Context) <-chan plc4go.PlcCo
                                c.passToDefaultIncomingMessageChannel()
                        }
                        c.log.Info().Msg("Ending incoming message transfer")
-               }()
+               })
                ch <- connectionConnectResult
-       }()
+       })
        return ch
 }
 
diff --git a/plc4go/internal/bacnetip/Discoverer.go 
b/plc4go/internal/bacnetip/Discoverer.go
index 7c52d29e83..8da8336889 100644
--- a/plc4go/internal/bacnetip/Discoverer.go
+++ b/plc4go/internal/bacnetip/Discoverer.go
@@ -167,16 +167,14 @@ func (d *Discoverer) broadcastAndDiscover(ctx 
context.Context, communicationChan
                        }
                }
 
-               go func(communicationChannelInstance communicationChannel) {
+               d.wg.Go(func() {
                        for {
                                if err := ctx.Err(); err != nil {
                                        d.log.Debug().Err(err).Msg("ending")
                                        return
                                }
                                blockingReadChan := make(chan bool)
-                               d.wg.Add(1)
-                               go func() {
-                                       defer d.wg.Done()
+                               d.wg.Go(func() {
                                        buf := make([]byte, 4096)
                                        n, addr, err := 
communicationChannelInstance.unicastConnection.ReadFrom(buf)
                                        if err != nil {
@@ -194,7 +192,7 @@ func (d *Discoverer) broadcastAndDiscover(ctx 
context.Context, communicationChan
                                        }
                                        incomingBVLCChannel <- 
receivedBvlcMessage{incomingBvlc, addr}
                                        blockingReadChan <- true
-                               }()
+                               })
                                select {
                                case ok := <-blockingReadChan:
                                        if !ok {
@@ -207,18 +205,16 @@ func (d *Discoverer) broadcastAndDiscover(ctx 
context.Context, communicationChan
                                        return
                                }
                        }
-               }(communicationChannelInstance)
+               })
 
-               go func(communicationChannelInstance communicationChannel) {
+               d.wg.Go(func() {
                        for {
                                if err := ctx.Err(); err != nil {
                                        d.log.Debug().Err(err).Msg("ending")
                                        return
                                }
                                blockingReadChan := make(chan bool)
-                               d.wg.Add(1)
-                               go func() {
-                                       defer d.wg.Done()
+                               d.wg.Go(func() {
                                        buf := make([]byte, 4096)
                                        n, addr, err := 
communicationChannelInstance.broadcastConnection.ReadFrom(buf)
                                        if err != nil {
@@ -235,7 +231,7 @@ func (d *Discoverer) broadcastAndDiscover(ctx 
context.Context, communicationChan
                                        }
                                        incomingBVLCChannel <- 
receivedBvlcMessage{incomingBvlc, addr}
                                        blockingReadChan <- true
-                               }()
+                               })
                                select {
                                case ok := <-blockingReadChan:
                                        if !ok {
@@ -248,7 +244,7 @@ func (d *Discoverer) broadcastAndDiscover(ctx 
context.Context, communicationChan
                                        return
                                }
                        }
-               }(communicationChannelInstance)
+               })
        }
        return incomingBVLCChannel, nil
 }
diff --git a/plc4go/internal/bacnetip/Reader.go 
b/plc4go/internal/bacnetip/Reader.go
index 447fc727e9..467fc03dba 100644
--- a/plc4go/internal/bacnetip/Reader.go
+++ b/plc4go/internal/bacnetip/Reader.go
@@ -69,9 +69,7 @@ func (m *Reader) Read(ctx context.Context, readRequest 
apiModel.PlcReadRequest)
        // TODO: handle ctx
        m.log.Trace().Msg("Reading")
        result := make(chan apiModel.PlcReadRequestResult, 1)
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                if len(readRequest.GetTagNames()) == 0 {
                        result <- 
spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, errors.New("at least 
one field required"))
                        return
@@ -138,7 +136,9 @@ func (m *Reader) Read(ctx context.Context, readRequest 
apiModel.PlcReadRequest)
 
                // Start a new request-transaction (Is ended in the 
response-handler)
                transaction := m.tm.StartTransaction()
-               transaction.Submit(func(transaction 
transactions.RequestTransaction) {
+               transaction.Submit(func(transactionContext context.Context, 
transaction transactions.RequestTransaction) {
+                       ctx, cancel := context.WithCancel(ctx)
+                       context.AfterFunc(transactionContext, cancel)
                        // Send the  over the wire
                        m.log.Trace().Msg("Send ")
                        if err := m.messageCodec.SendRequest(ctx, apdu, 
func(message spi.Message) bool {
@@ -206,7 +206,7 @@ func (m *Reader) Read(ctx context.Context, readRequest 
apiModel.PlcReadRequest)
                                }
                        }
                })
-       }()
+       })
        return result
 }
 
diff --git a/plc4go/internal/bacnetip/Subscriber.go 
b/plc4go/internal/bacnetip/Subscriber.go
index b95913219d..280ae2fae6 100644
--- a/plc4go/internal/bacnetip/Subscriber.go
+++ b/plc4go/internal/bacnetip/Subscriber.go
@@ -55,9 +55,7 @@ func NewSubscriber(connection *Connection, _options 
...options.WithOption) *Subs
 
 func (m *Subscriber) Subscribe(ctx context.Context, subscriptionRequest 
apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
        result := make(chan apiModel.PlcSubscriptionRequestResult, 1)
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                internalPlcSubscriptionRequest := 
subscriptionRequest.(*spiModel.DefaultPlcSubscriptionRequest)
 
                // Add this subscriber to the connection.
@@ -85,7 +83,7 @@ func (m *Subscriber) Subscribe(ctx context.Context, 
subscriptionRequest apiModel
                        ),
                        nil,
                )
-       }()
+       })
        return result
 }
 
diff --git a/plc4go/internal/bacnetip/bacgopes/core/core.go 
b/plc4go/internal/bacnetip/bacgopes/core/core.go
index bcec235c46..c7b29c1b7e 100644
--- a/plc4go/internal/bacnetip/bacgopes/core/core.go
+++ b/plc4go/internal/bacnetip/bacgopes/core/core.go
@@ -51,18 +51,15 @@ var wg sync.WaitGroup // use to track spawned go routines
 
 func run() {
        running = true
-       wg.Add(1)
-       go func() {
+       wg.Go(func() {
                defer wg.Done()
                c := make(chan os.Signal, 1)
                signal.Notify(c, os.Interrupt, syscall.SIGTERM)
 
                <-c
                running = false
-       }()
-       wg.Add(1)
-       go func() {
-               defer wg.Done()
+       })
+       wg.Go(func() {
                for running {
                        // get the next task
                        var delta time.Duration
@@ -112,7 +109,7 @@ func run() {
                                }
                        }
                }
-       }()
+       })
 }
 
 // RunOnce makes a pass through the scheduled tasks and deferred functions just
diff --git a/plc4go/internal/bacnetip/bacgopes/iocb/iocb_IOCB.go 
b/plc4go/internal/bacnetip/bacgopes/iocb/iocb_IOCB.go
index 5efdffb02c..d8ffd151ff 100644
--- a/plc4go/internal/bacnetip/bacgopes/iocb/iocb_IOCB.go
+++ b/plc4go/internal/bacnetip/bacgopes/iocb/iocb_IOCB.go
@@ -251,15 +251,13 @@ func (i *IOCB) SetTimeout(delay time.Duration) {
                now := GetTaskManagerTime()
                i.ioTimeout = time.NewTimer(delay)
                i.ioTimoutCancel = make(chan struct{})
-               i.wg.Add(1)
-               go func() {
-                       defer i.wg.Done()
+               i.wg.Go(func() {
                        select {
                        case timeout := <-i.ioTimeout.C:
                                _ = 
i.Abort(utils.NewTimeoutError(now.Sub(timeout)))
                        case <-i.ioTimoutCancel:
                        }
-               }()
+               })
        }
 }
 
diff --git a/plc4go/internal/bacnetip/bacgopes/iocb/iocb_IOQueue.go 
b/plc4go/internal/bacnetip/bacgopes/iocb/iocb_IOQueue.go
index 440e7afc26..76f9720cd3 100644
--- a/plc4go/internal/bacnetip/bacgopes/iocb/iocb_IOQueue.go
+++ b/plc4go/internal/bacnetip/bacgopes/iocb/iocb_IOQueue.go
@@ -87,12 +87,10 @@ func (i *IOQueue) Get(block bool, delay *time.Duration) 
(IOCBContract, error) {
        if len(i.Queue) == 0 {
                if delay != nil {
                        gotSomething := make(chan struct{})
-                       i.wg.Add(1)
-                       go func() {
-                               defer i.wg.Done()
+                       i.wg.Go(func() {
                                i.notEmpty.Wait()
                                close(gotSomething)
-                       }()
+                       })
                        timeout := time.NewTimer(*delay)
                        select {
                        case <-gotSomething:
diff --git a/plc4go/internal/bacnetip/bacgopes/udp/udp_UDPDirector.go 
b/plc4go/internal/bacnetip/bacgopes/udp/udp_UDPDirector.go
index e5188ba5b0..ce4c9dd6f9 100644
--- a/plc4go/internal/bacnetip/bacgopes/udp/udp_UDPDirector.go
+++ b/plc4go/internal/bacnetip/bacgopes/udp/udp_UDPDirector.go
@@ -96,19 +96,15 @@ func NewUDPDirector(localLog zerolog.Logger, address 
AddressTuple[string, uint16
        }
 
        d.running = true
-       d.wg.Add(1)
-       go func() {
-               defer d.wg.Done()
+       d.wg.Go(func() {
                for d.running {
                        d.handleRead()
                }
-       }()
+       })
 
        // create the request queue
        d.request = make(chan PDU)
-       d.wg.Add(1)
-       go func() {
-               defer d.wg.Done()
+       d.wg.Go(func() {
                for d.running {
                        pdu := <-d.request
                        serialize, err := pdu.GetRootMessage().Serialize()
@@ -131,7 +127,7 @@ func NewUDPDirector(localLog zerolog.Logger, address 
AddressTuple[string, uint16
                        }
                        localLog.Debug().Int("writtenBytes", 
writtenBytes).Msg("written bytes")
                }
-       }()
+       })
 
        // start with an empty peer pool
        d.peers = map[string]*UDPActor{}
@@ -226,13 +222,11 @@ func (d *UDPDirector) handleRead() {
        }
        pdu := NewCPDU(readBytes, NKW(KWCPCISource, saddr, KWCPCIDestination, 
daddr), WithRootMessage(bvlc)) // TODO: why do we set the destination here??? 
This might be completely wrong
        // send the _PDU up to the client
-       d.wg.Add(1)
-       go func() {
-               defer d.wg.Done()
+       d.wg.Go(func() {
                if err := d._response(pdu); err != nil {
                        d.log.Debug().Err(err).Msg("errored")
                }
-       }()
+       })
 }
 
 func (d *UDPDirector) writeable() {
diff --git a/plc4go/internal/cbus/Browser.go b/plc4go/internal/cbus/Browser.go
index 689348d8ff..85a3f73b38 100644
--- a/plc4go/internal/cbus/Browser.go
+++ b/plc4go/internal/cbus/Browser.go
@@ -339,9 +339,7 @@ func (m *Browser) getInstalledUnitAddressBytes(ctx 
context.Context) (map[byte]an
        readCtx, readCtxCancel := context.WithTimeout(ctx, 2*time.Second)
        defer readCtxCancel()
        readWg := new(sync.WaitGroup)
-       readWg.Add(1)
-       go func() {
-               defer readWg.Done()
+       readWg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                m.log.Error().
@@ -431,7 +429,7 @@ func (m *Browser) getInstalledUnitAddressBytes(ctx 
context.Context) (map[byte]an
                                Stringer("responseCode", responseCode).
                                Msg("We got responseCode as response code for 
installation mmi so we rely on getting it via subscription")
                }
-       }()
+       })
 
        syncCtx, syncCtxCancel := context.WithTimeout(ctx, 6*time.Second)
        defer syncCtxCancel()
diff --git a/plc4go/internal/cbus/CBusMessageMapper_test.go 
b/plc4go/internal/cbus/CBusMessageMapper_test.go
index 31833fe0e8..f542c78d73 100644
--- a/plc4go/internal/cbus/CBusMessageMapper_test.go
+++ b/plc4go/internal/cbus/CBusMessageMapper_test.go
@@ -20,6 +20,7 @@
 package cbus
 
 import (
+       "context"
        "fmt"
        "testing"
 
@@ -1424,7 +1425,7 @@ func TestMapEncodedReply(t *testing.T) {
                                })
                                transaction := 
transactionManager.StartTransaction()
                                t.Logf("Submitting No-Op to transaction\n%v", 
transaction)
-                               transaction.Submit(func(transaction 
transactions.RequestTransaction) {
+                               transaction.Submit(func(context.Context, 
transactions.RequestTransaction) {
                                        // NO-OP
                                })
                                args.transaction = transaction
@@ -1501,7 +1502,7 @@ func TestMapEncodedReply(t *testing.T) {
                                })
                                transaction := 
transactionManager.StartTransaction()
                                t.Logf("Submitting No-Op to transaction %v", 
transaction)
-                               transaction.Submit(func(transaction 
transactions.RequestTransaction) {
+                               transaction.Submit(func(context.Context, 
transactions.RequestTransaction) {
                                        // NO-OP
                                        t.Log("No op-ing")
                                })
@@ -1594,7 +1595,7 @@ func TestMapEncodedReply(t *testing.T) {
                                        assert.NoError(t, 
transactionManager.Close())
                                })
                                transaction := 
transactionManager.StartTransaction()
-                               transaction.Submit(func(transaction 
transactions.RequestTransaction) {
+                               transaction.Submit(func(context.Context, 
transactions.RequestTransaction) {
                                        // NO-OP
                                })
                                args.transaction = transaction
@@ -1677,7 +1678,7 @@ func TestMapEncodedReply(t *testing.T) {
                                        assert.NoError(t, 
transactionManager.Close())
                                })
                                transaction := 
transactionManager.StartTransaction()
-                               transaction.Submit(func(transaction 
transactions.RequestTransaction) {
+                               transaction.Submit(func(context.Context, 
transactions.RequestTransaction) {
                                        // NO-OP
                                })
                                args.transaction = transaction
@@ -1743,7 +1744,7 @@ func TestMapEncodedReply(t *testing.T) {
                                        assert.NoError(t, 
transactionManager.Close())
                                })
                                transaction := 
transactionManager.StartTransaction()
-                               transaction.Submit(func(transaction 
transactions.RequestTransaction) {
+                               transaction.Submit(func(context.Context, 
transactions.RequestTransaction) {
                                        // NO-OP
                                })
                                args.transaction = transaction
@@ -1812,7 +1813,7 @@ func TestMapEncodedReply(t *testing.T) {
                                        assert.NoError(t, 
transactionManager.Close())
                                })
                                transaction := 
transactionManager.StartTransaction()
-                               transaction.Submit(func(transaction 
transactions.RequestTransaction) {
+                               transaction.Submit(func(context.Context, 
transactions.RequestTransaction) {
                                        // NO-OP
                                })
                                args.transaction = transaction
@@ -1900,7 +1901,7 @@ func TestMapEncodedReply(t *testing.T) {
                                        assert.NoError(t, 
transactionManager.Close())
                                })
                                transaction := 
transactionManager.StartTransaction()
-                               transaction.Submit(func(transaction 
transactions.RequestTransaction) {
+                               transaction.Submit(func(context.Context, 
transactions.RequestTransaction) {
                                        // NO-OP
                                })
                                args.transaction = transaction
@@ -2006,7 +2007,7 @@ func TestMapEncodedReply(t *testing.T) {
                                        assert.NoError(t, 
transactionManager.Close())
                                })
                                transaction := 
transactionManager.StartTransaction()
-                               transaction.Submit(func(transaction 
transactions.RequestTransaction) {
+                               transaction.Submit(func(context.Context, 
transactions.RequestTransaction) {
                                        // NO-OP
                                })
                                args.transaction = transaction
@@ -2076,7 +2077,7 @@ func TestMapEncodedReply(t *testing.T) {
                                        assert.NoError(t, 
transactionManager.Close())
                                })
                                transaction := 
transactionManager.StartTransaction()
-                               transaction.Submit(func(transaction 
transactions.RequestTransaction) {
+                               transaction.Submit(func(context.Context, 
transactions.RequestTransaction) {
                                        // NO-OP
                                })
                                args.transaction = transaction
@@ -2142,7 +2143,7 @@ func TestMapEncodedReply(t *testing.T) {
                                        assert.NoError(t, 
transactionManager.Close())
                                })
                                transaction := 
transactionManager.StartTransaction()
-                               transaction.Submit(func(transaction 
transactions.RequestTransaction) {
+                               transaction.Submit(func(context.Context, 
transactions.RequestTransaction) {
                                        // NO-OP
                                })
                                args.transaction = transaction
@@ -2208,7 +2209,7 @@ func TestMapEncodedReply(t *testing.T) {
                                        assert.NoError(t, 
transactionManager.Close())
                                })
                                transaction := 
transactionManager.StartTransaction()
-                               transaction.Submit(func(transaction 
transactions.RequestTransaction) {
+                               transaction.Submit(func(context.Context, 
transactions.RequestTransaction) {
                                        // NO-OP
                                })
                                args.transaction = transaction
@@ -2274,7 +2275,7 @@ func TestMapEncodedReply(t *testing.T) {
                                        assert.NoError(t, 
transactionManager.Close())
                                })
                                transaction := 
transactionManager.StartTransaction()
-                               transaction.Submit(func(transaction 
transactions.RequestTransaction) {
+                               transaction.Submit(func(context.Context, 
transactions.RequestTransaction) {
                                        // NO-OP
                                })
                                args.transaction = transaction
@@ -2340,7 +2341,7 @@ func TestMapEncodedReply(t *testing.T) {
                                        assert.NoError(t, 
transactionManager.Close())
                                })
                                transaction := 
transactionManager.StartTransaction()
-                               transaction.Submit(func(transaction 
transactions.RequestTransaction) {
+                               transaction.Submit(func(context.Context, 
transactions.RequestTransaction) {
                                        // NO-OP
                                })
                                args.transaction = transaction
@@ -2426,7 +2427,7 @@ func TestMapEncodedReply(t *testing.T) {
                                        assert.NoError(t, 
transactionManager.Close())
                                })
                                transaction := 
transactionManager.StartTransaction()
-                               transaction.Submit(func(transaction 
transactions.RequestTransaction) {
+                               transaction.Submit(func(context.Context, 
transactions.RequestTransaction) {
                                        // NO-OP
                                })
                                args.transaction = transaction
@@ -2492,7 +2493,7 @@ func TestMapEncodedReply(t *testing.T) {
                                        assert.NoError(t, 
transactionManager.Close())
                                })
                                transaction := 
transactionManager.StartTransaction()
-                               transaction.Submit(func(transaction 
transactions.RequestTransaction) {
+                               transaction.Submit(func(context.Context, 
transactions.RequestTransaction) {
                                        // NO-OP
                                })
                                args.transaction = transaction
@@ -2558,7 +2559,7 @@ func TestMapEncodedReply(t *testing.T) {
                                        assert.NoError(t, 
transactionManager.Close())
                                })
                                transaction := 
transactionManager.StartTransaction()
-                               transaction.Submit(func(transaction 
transactions.RequestTransaction) {
+                               transaction.Submit(func(context.Context, 
transactions.RequestTransaction) {
                                        // NO-OP
                                })
                                args.transaction = transaction
@@ -2624,7 +2625,7 @@ func TestMapEncodedReply(t *testing.T) {
                                        assert.NoError(t, 
transactionManager.Close())
                                })
                                transaction := 
transactionManager.StartTransaction()
-                               transaction.Submit(func(transaction 
transactions.RequestTransaction) {
+                               transaction.Submit(func(context.Context, 
transactions.RequestTransaction) {
                                        // NO-OP
                                })
                                args.transaction = transaction
@@ -2690,7 +2691,7 @@ func TestMapEncodedReply(t *testing.T) {
                                        assert.NoError(t, 
transactionManager.Close())
                                })
                                transaction := 
transactionManager.StartTransaction()
-                               transaction.Submit(func(transaction 
transactions.RequestTransaction) {
+                               transaction.Submit(func(context.Context, 
transactions.RequestTransaction) {
                                        // NO-OP
                                })
                                args.transaction = transaction
@@ -2756,7 +2757,7 @@ func TestMapEncodedReply(t *testing.T) {
                                        assert.NoError(t, 
transactionManager.Close())
                                })
                                transaction := 
transactionManager.StartTransaction()
-                               transaction.Submit(func(transaction 
transactions.RequestTransaction) {
+                               transaction.Submit(func(context.Context, 
transactions.RequestTransaction) {
                                        // NO-OP
                                })
                                args.transaction = transaction
@@ -2838,7 +2839,7 @@ func TestMapEncodedReply(t *testing.T) {
                                        assert.NoError(t, 
transactionManager.Close())
                                })
                                transaction := 
transactionManager.StartTransaction()
-                               transaction.Submit(func(transaction 
transactions.RequestTransaction) {
+                               transaction.Submit(func(context.Context, 
transactions.RequestTransaction) {
                                        // NO-OP
                                })
                                args.transaction = transaction
@@ -2904,7 +2905,7 @@ func TestMapEncodedReply(t *testing.T) {
                                        assert.NoError(t, 
transactionManager.Close())
                                })
                                transaction := 
transactionManager.StartTransaction()
-                               transaction.Submit(func(transaction 
transactions.RequestTransaction) {
+                               transaction.Submit(func(context.Context, 
transactions.RequestTransaction) {
                                        // NO-OP
                                })
                                args.transaction = transaction
@@ -2970,7 +2971,7 @@ func TestMapEncodedReply(t *testing.T) {
                                        assert.NoError(t, 
transactionManager.Close())
                                })
                                transaction := 
transactionManager.StartTransaction()
-                               transaction.Submit(func(transaction 
transactions.RequestTransaction) {
+                               transaction.Submit(func(context.Context, 
transactions.RequestTransaction) {
                                        // NO-OP
                                })
                                args.transaction = transaction
diff --git a/plc4go/internal/cbus/Connection.go 
b/plc4go/internal/cbus/Connection.go
index d2ae5a5378..b16248c280 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -129,9 +129,7 @@ func (c *Connection) GetMessageCodec() spi.MessageCodec {
 func (c *Connection) ConnectWithContext(ctx context.Context) <-chan 
plc4go.PlcConnectionConnectResult {
        c.log.Trace().Msg("Connecting")
        ch := make(chan plc4go.PlcConnectionConnectResult, 1)
-       c.wg.Add(1)
-       go func() {
-               defer c.wg.Done()
+       c.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                c.fireConnectionError(errors.Errorf("panic-ed 
%v. Stack:\n%s", err, debug.Stack()), ch)
@@ -154,21 +152,19 @@ func (c *Connection) ConnectWithContext(ctx 
context.Context) <-chan plc4go.PlcCo
                }
 
                c.setupConnection(ctx, ch)
-       }()
+       })
        return ch
 }
 
 func (c *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
        results := make(chan plc4go.PlcConnectionCloseResult, 1)
-       c.wg.Add(1)
-       go func() {
-               defer c.wg.Done()
+       c.wg.Go(func() {
                result := <-c.DefaultConnection.Close()
                c.log.Trace().Msg("Waiting for handlers to stop")
                c.handlerWaitGroup.Wait()
                c.log.Trace().Msg("handlers stopped, dispatching result")
                results <- result
-       }()
+       })
        return results
 }
 
@@ -271,12 +267,8 @@ func (c *Connection) setupConnection(ctx context.Context, 
ch chan plc4go.PlcConn
 
 func (c *Connection) startSubscriptionHandler() {
        c.log.Debug().Msg("Starting SAL handler")
-       c.handlerWaitGroup.Add(1)
-       c.wg.Add(1)
-       go func() {
-               defer c.wg.Done()
+       c.handlerWaitGroup.Go(func() {
                salLogger := c.log.With().Str("handlerType", "SAL").Logger()
-               defer c.handlerWaitGroup.Done()
                defer func() {
                        if err := recover(); err != nil {
                                salLogger.Error().
@@ -313,14 +305,10 @@ func (c *Connection) startSubscriptionHandler() {
                        }
                }
                salLogger.Info().Msg("handler ended")
-       }()
+       })
        c.log.Debug().Msg("Starting MMI handler")
-       c.handlerWaitGroup.Add(1)
-       c.wg.Add(1)
-       go func() {
-               defer c.wg.Done()
+       c.handlerWaitGroup.Go(func() {
                mmiLogger := c.log.With().Str("handlerType", "MMI").Logger()
-               defer c.handlerWaitGroup.Done()
                defer func() {
                        if err := recover(); err != nil {
                                mmiLogger.Error().
@@ -352,7 +340,7 @@ func (c *Connection) startSubscriptionHandler() {
                        }
                }
                mmiLogger.Info().Msg("handler ended")
-       }()
+       })
 }
 
 func (c *Connection) sendReset(ctx context.Context, ch chan 
plc4go.PlcConnectionConnectResult, cbusOptions *readWriteModel.CBusOptions, 
requestContext *readWriteModel.RequestContext, sendOutErrorNotification bool) 
(ok bool) {
diff --git a/plc4go/internal/cbus/Connection_test.go 
b/plc4go/internal/cbus/Connection_test.go
index f77bbd3c57..cea1bfd9da 100644
--- a/plc4go/internal/cbus/Connection_test.go
+++ b/plc4go/internal/cbus/Connection_test.go
@@ -1685,10 +1685,8 @@ func TestConnection_startSubscriptionHandler(t 
*testing.T) {
                                codec.monitoredMMIs = make(chan 
readWriteModel.CALReply, 1)
                                codec.monitoredSALs = make(chan 
readWriteModel.MonitoredSAL, 1)
                                dispatchWg := new(sync.WaitGroup)
-                               dispatchWg.Add(1)
                                t.Cleanup(dispatchWg.Wait)
-                               go func() {
-                                       defer dispatchWg.Done()
+                               dispatchWg.Go(func() {
                                        codec.monitoredMMIs <- 
readWriteModel.NewCALReplyShort(0, nil)
                                        codec.monitoredSALs <- 
readWriteModel.NewMonitoredSALShortFormBasicMode(
                                                0,
@@ -1699,7 +1697,7 @@ func TestConnection_startSubscriptionHandler(t 
*testing.T) {
                                                
readWriteModel.ApplicationIdContainer_ACCESS_CONTROL_D5,
                                                nil,
                                        )
-                               }()
+                               })
                                t.Cleanup(func() {
                                        assert.NoError(t, codec.Disconnect())
                                })
@@ -1718,10 +1716,8 @@ func TestConnection_startSubscriptionHandler(t 
*testing.T) {
                                codec := NewMessageCodec(nil, _options...)
                                written := make(chan struct{})
                                dispatchWg := new(sync.WaitGroup)
-                               dispatchWg.Add(1)
                                t.Cleanup(dispatchWg.Wait)
-                               go func() {
-                                       defer dispatchWg.Done()
+                               dispatchWg.Go(func() {
                                        codec.monitoredMMIs <- 
readWriteModel.NewCALReplyShort(0, nil)
                                        codec.monitoredSALs <- 
readWriteModel.NewMonitoredSALShortFormBasicMode(
                                                0,
@@ -1733,7 +1729,7 @@ func TestConnection_startSubscriptionHandler(t 
*testing.T) {
                                                nil,
                                        )
                                        close(written)
-                               }()
+                               })
                                t.Cleanup(func() {
                                        <-written
                                })
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index ed7dc46e28..75a5eac2e8 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -128,7 +128,9 @@ func (m *Reader) readSync(ctx context.Context, readRequest 
apiModel.PlcReadReque
 func (m *Reader) createMessageTransactionAndWait(ctx context.Context, 
messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, 
responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name 
string, plcValue apiValues.PlcValue)) {
        // Start a new request-transaction (Is ended in the response-handler)
        transaction := m.tm.StartTransaction()
-       transaction.Submit(func(transaction transactions.RequestTransaction) {
+       transaction.Submit(func(transactionContext context.Context, transaction 
transactions.RequestTransaction) {
+               ctx, cancel := context.WithCancel(ctx)
+               context.AfterFunc(transactionContext, cancel)
                m.log.Trace().Stringer("transaction", 
transaction).Msg("Transaction getting handled")
                m.sendMessageOverTheWire(ctx, transaction, messageToSend, 
addResponseCode, tagName, addPlcValue)
        })
diff --git a/plc4go/internal/cbus/Subscriber.go 
b/plc4go/internal/cbus/Subscriber.go
index 939858b5c8..73801431c6 100644
--- a/plc4go/internal/cbus/Subscriber.go
+++ b/plc4go/internal/cbus/Subscriber.go
@@ -64,9 +64,7 @@ func NewSubscriber(addSubscriber func(subscriber 
*Subscriber), _options ...optio
 
 func (s *Subscriber) Subscribe(_ context.Context, subscriptionRequest 
apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
        result := make(chan apiModel.PlcSubscriptionRequestResult, 1)
-       s.wg.Add(1)
-       go func() {
-               defer s.wg.Done()
+       s.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                result <- 
spiModel.NewDefaultPlcSubscriptionRequestResult(subscriptionRequest, nil, 
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -106,7 +104,7 @@ func (s *Subscriber) Subscribe(_ context.Context, 
subscriptionRequest apiModel.P
                        ),
                        nil,
                )
-       }()
+       })
        return result
 }
 
diff --git a/plc4go/internal/cbus/Writer.go b/plc4go/internal/cbus/Writer.go
index a83ade5de4..9e7fc790bd 100644
--- a/plc4go/internal/cbus/Writer.go
+++ b/plc4go/internal/cbus/Writer.go
@@ -60,9 +60,7 @@ func NewWriter(tpduGenerator *AlphaGenerator, messageCodec 
*MessageCodec, tm tra
 func (m *Writer) Write(ctx context.Context, writeRequest 
apiModel.PlcWriteRequest) <-chan apiModel.PlcWriteRequestResult {
        m.log.Trace().Msg("Writing")
        result := make(chan apiModel.PlcWriteRequestResult, 1)
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                result <- 
spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, 
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -116,7 +114,9 @@ func (m *Writer) Write(ctx context.Context, writeRequest 
apiModel.PlcWriteReques
                        tagNameCopy := tagName
                        // Start a new request-transaction (Is ended in the 
response-handler)
                        transaction := m.tm.StartTransaction()
-                       transaction.Submit(func(transaction 
transactions.RequestTransaction) {
+                       transaction.Submit(func(transactionContext 
context.Context, transaction transactions.RequestTransaction) {
+                               ctx, cancel := context.WithCancel(ctx)
+                               context.AfterFunc(transactionContext, cancel)
                                // Send the  over the wire
                                m.log.Trace().Msg("Send ")
                                if err := m.messageCodec.SendRequest(ctx, 
messageToSend, func(receivedMessage spi.Message) bool {
@@ -159,6 +159,6 @@ func (m *Writer) Write(ctx context.Context, writeRequest 
apiModel.PlcWriteReques
                }
                readResponse := 
spiModel.NewDefaultPlcWriteResponse(writeRequest, responseCodes)
                result <- 
spiModel.NewDefaultPlcWriteRequestResult(writeRequest, readResponse, nil)
-       }()
+       })
        return result
 }
diff --git a/plc4go/internal/eip/Connection.go 
b/plc4go/internal/eip/Connection.go
index 9d4d77f62a..46d67a205e 100644
--- a/plc4go/internal/eip/Connection.go
+++ b/plc4go/internal/eip/Connection.go
@@ -130,9 +130,7 @@ func (c *Connection) GetMessageCodec() spi.MessageCodec {
 func (c *Connection) ConnectWithContext(ctx context.Context) <-chan 
plc4go.PlcConnectionConnectResult {
        c.log.Trace().Msg("Connecting")
        ch := make(chan plc4go.PlcConnectionConnectResult, 1)
-       c.wg.Add(1)
-       go func() {
-               defer c.wg.Done()
+       c.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                ch <- 
_default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v. 
Stack: %s", err, debug.Stack()))
@@ -155,7 +153,7 @@ func (c *Connection) ConnectWithContext(ctx 
context.Context) <-chan plc4go.PlcCo
                }
 
                c.setupConnection(ctx, ch)
-       }()
+       })
        return ch
 }
 
@@ -163,9 +161,7 @@ func (c *Connection) Close() <-chan 
plc4go.PlcConnectionCloseResult {
        // TODO: use proper context
        ctx := context.TODO()
        result := make(chan plc4go.PlcConnectionCloseResult, 1)
-       c.wg.Add(1)
-       go func() {
-               defer c.wg.Done()
+       c.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                result <- 
_default.NewDefaultPlcConnectionCloseResult(c, errors.Errorf("panic-ed %v. 
Stack: %s", err, debug.Stack()))
@@ -193,7 +189,7 @@ func (c *Connection) Close() <-chan 
plc4go.PlcConnectionCloseResult {
                        Uint32("sessionHandle", c.sessionHandle).
                        Msg("Unregistred Session %d")
                result <- _default.NewDefaultPlcConnectionCloseResult(c, nil)
-       }()
+       })
        return result
 }
 
diff --git a/plc4go/internal/eip/Reader.go b/plc4go/internal/eip/Reader.go
index 0e3f6784fa..b645e3be26 100644
--- a/plc4go/internal/eip/Reader.go
+++ b/plc4go/internal/eip/Reader.go
@@ -70,9 +70,7 @@ func (m *Reader) Read(ctx context.Context, readRequest 
apiModel.PlcReadRequest)
        // TODO: handle ctx
        m.log.Trace().Msg("Reading")
        result := make(chan apiModel.PlcReadRequestResult, 1)
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                result <- 
spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, 
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -109,7 +107,9 @@ func (m *Reader) Read(ctx context.Context, readRequest 
apiModel.PlcReadRequest)
                                typeIds,
                        )
                        transaction := m.tm.StartTransaction()
-                       transaction.Submit(func(transaction 
transactions.RequestTransaction) {
+                       transaction.Submit(func(transactionContext 
context.Context, transaction transactions.RequestTransaction) {
+                               ctx, cancel := context.WithCancel(ctx)
+                               context.AfterFunc(transactionContext, cancel)
                                if err := m.messageCodec.SendRequest(
                                        ctx,
                                        request,
@@ -167,7 +167,7 @@ func (m *Reader) Read(ctx context.Context, readRequest 
apiModel.PlcReadRequest)
                                }
                        })
                }
-       }()
+       })
        return result
 }
 
diff --git a/plc4go/internal/eip/Writer.go b/plc4go/internal/eip/Writer.go
index 0c7ef52a16..5bc832ec64 100644
--- a/plc4go/internal/eip/Writer.go
+++ b/plc4go/internal/eip/Writer.go
@@ -66,9 +66,7 @@ func NewWriter(messageCodec spi.MessageCodec, tm 
transactions.RequestTransaction
 func (m *Writer) Write(ctx context.Context, writeRequest 
apiModel.PlcWriteRequest) <-chan apiModel.PlcWriteRequestResult {
        // TODO: handle context
        result := make(chan apiModel.PlcWriteRequestResult, 1)
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                result <- 
spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, 
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -283,7 +281,7 @@ func (m *Writer) Write(ctx context.Context, writeRequest 
apiModel.PlcWriteReques
                                                }
                                        })
                                }*/
-       }()
+       })
        return result
 }
 
diff --git a/plc4go/internal/knxnetip/Connection.go 
b/plc4go/internal/knxnetip/Connection.go
index ba1d29104c..1dc7ffaa2e 100644
--- a/plc4go/internal/knxnetip/Connection.go
+++ b/plc4go/internal/knxnetip/Connection.go
@@ -240,9 +240,7 @@ func (m *Connection) ConnectWithContext(ctx 
context.Context) <-chan plc4go.PlcCo
                result <- 
_default.NewDefaultPlcConnectionConnectResult(connection, err)
        }
 
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                result <- 
_default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v. 
Stack: %s", err, debug.Stack()))
@@ -314,9 +312,7 @@ func (m *Connection) ConnectWithContext(ctx 
context.Context) <-chan plc4go.PlcCo
                                // handled by any other handler. This is where 
usually the GroupValueWrite messages
                                // are being handled.
                                m.log.Debug().Msg("Starting tunneling handler")
-                               m.wg.Add(1)
-                               go func() {
-                                       defer m.wg.Done()
+                               m.wg.Go(func() {
                                        defer func() {
                                                if err := recover(); err != nil 
{
                                                        m.log.Error().
@@ -374,7 +370,7 @@ func (m *Connection) ConnectWithContext(ctx 
context.Context) <-chan plc4go.PlcCo
                                                }
                                        }
                                        m.log.Warn().Msg("Tunneling handler 
shat down")
-                               }()
+                               })
 
                                // Fire the "connected" event
                                sendResult(m, nil)
@@ -386,7 +382,7 @@ func (m *Connection) ConnectWithContext(ctx 
context.Context) <-chan plc4go.PlcCo
                } else {
                        m.doSomethingAndClose(func() { sendResult(nil, 
errors.New("this device doesn't support tunneling")) })
                }
-       }()
+       })
 
        return result
 }
@@ -419,9 +415,7 @@ func (m *Connection) Close() <-chan 
plc4go.PlcConnectionCloseResult {
        ctx := context.TODO()
        result := make(chan plc4go.PlcConnectionCloseResult, 1)
 
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                result <- 
_default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v. 
Stack: %s", err, debug.Stack()))
@@ -455,7 +449,7 @@ func (m *Connection) Close() <-chan 
plc4go.PlcConnectionCloseResult {
                } else {
                        result <- 
_default.NewDefaultPlcConnectionCloseResult(m, nil)
                }
-       }()
+       })
 
        return result
 }
@@ -484,9 +478,7 @@ func (m *Connection) Ping() <-chan 
plc4go.PlcConnectionPingResult {
        ctx := context.TODO()
        result := make(chan plc4go.PlcConnectionPingResult, 1)
 
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                result <- 
_default.NewDefaultPlcConnectionPingResult(errors.Errorf("panic-ed %v. Stack: 
%s", err, debug.Stack()))
@@ -500,7 +492,7 @@ func (m *Connection) Ping() <-chan 
plc4go.PlcConnectionPingResult {
                        result <- 
_default.NewDefaultPlcConnectionPingResult(nil)
                }
                return
-       }()
+       })
 
        return result
 }
diff --git a/plc4go/internal/knxnetip/ConnectionDriverSpecificOperations.go 
b/plc4go/internal/knxnetip/ConnectionDriverSpecificOperations.go
index 6e56ff0a8c..c8c7baa048 100644
--- a/plc4go/internal/knxnetip/ConnectionDriverSpecificOperations.go
+++ b/plc4go/internal/knxnetip/ConnectionDriverSpecificOperations.go
@@ -65,9 +65,7 @@ func (m *Connection) ReadGroupAddress(ctx context.Context, 
groupAddress []byte,
                }
        }
 
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                m.log.Error().
@@ -107,7 +105,7 @@ func (m *Connection) ReadGroupAddress(ctx context.Context, 
groupAddress []byte,
 
                // Return the value
                sendResponse(plcValue, 1, nil)
-       }()
+       })
 
        return result
 }
@@ -130,9 +128,7 @@ func (m *Connection) DeviceConnect(ctx context.Context, 
targetAddress driverMode
                }
        }
 
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                m.log.Error().
@@ -208,7 +204,7 @@ func (m *Connection) DeviceConnect(ctx context.Context, 
targetAddress driverMode
                connection.maxApdu = uint16(math.Min(float64(deviceApduSize), 
240))
 
                sendResponse(connection, nil)
-       }()
+       })
 
        return result
 }
@@ -231,9 +227,7 @@ func (m *Connection) DeviceDisconnect(ctx context.Context, 
targetAddress driverM
                }
        }
 
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                m.log.Error().
@@ -252,7 +246,7 @@ func (m *Connection) DeviceDisconnect(ctx context.Context, 
targetAddress driverM
                } else {
                        sendResponse(connection, nil)
                }
-       }()
+       })
 
        return result
 }
@@ -274,9 +268,7 @@ func (m *Connection) DeviceAuthenticate(ctx 
context.Context, targetAddress drive
                }
        }
 
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                m.log.Error().
@@ -315,7 +307,7 @@ func (m *Connection) DeviceAuthenticate(ctx 
context.Context, targetAddress drive
                } else {
                        sendResponse(errors.Errorf("got error authenticating at 
device %s", KnxAddressToString(targetAddress)))
                }
-       }()
+       })
 
        return result
 }
@@ -339,9 +331,7 @@ func (m *Connection) DeviceReadProperty(ctx 
context.Context, targetAddress drive
                }
        }
 
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                m.log.Error().
@@ -405,7 +395,7 @@ func (m *Connection) DeviceReadProperty(ctx 
context.Context, targetAddress drive
                } else {
                        sendResponse(plcValue, 1, err)
                }
-       }()
+       })
 
        return result
 }
@@ -429,9 +419,7 @@ func (m *Connection) DeviceReadPropertyDescriptor(ctx 
context.Context, targetAdd
                }
        }
 
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                m.log.Error().
@@ -475,7 +463,7 @@ func (m *Connection) DeviceReadPropertyDescriptor(ctx 
context.Context, targetAdd
                val["writeLevel"] = 
spiValues.NewPlcSTRING(propertyDescriptionResponse.GetWriteLevel().String())
                str := spiValues.NewPlcStruct(val)
                sendResponse(&str, 1, nil)
-       }()
+       })
 
        return result
 }
@@ -499,9 +487,7 @@ func (m *Connection) DeviceReadMemory(ctx context.Context, 
targetAddress driverM
                }
        }
 
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                m.log.Error().
@@ -588,7 +574,7 @@ func (m *Connection) DeviceReadMemory(ctx context.Context, 
targetAddress driverM
                } else if len(results) == 1 {
                        sendResponse(results[0], 1, nil)
                }
-       }()
+       })
 
        return result
 }
diff --git a/plc4go/internal/knxnetip/ConnectionHelper.go 
b/plc4go/internal/knxnetip/ConnectionHelper.go
index feadf9bd67..1a4b7d55af 100644
--- a/plc4go/internal/knxnetip/ConnectionHelper.go
+++ b/plc4go/internal/knxnetip/ConnectionHelper.go
@@ -54,9 +54,7 @@ func (m *Connection) castIpToKnxAddress(ip net.IP) 
driverModel.IPAddress {
 }
 
 func (m *Connection) handleIncomingTunnelingRequest(ctx context.Context, 
tunnelingRequest driverModel.TunnelingRequest) {
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                m.log.Error().
@@ -115,7 +113,7 @@ func (m *Connection) handleIncomingTunnelingRequest(ctx 
context.Context, tunneli
                default:
                        m.log.Info().Msg("Unknown unhandled message.")
                }
-       }()
+       })
 }
 
 func (m *Connection) handleValueCacheUpdate(ctx context.Context, 
destinationAddress []byte, payload []byte) {
@@ -142,9 +140,7 @@ func (m *Connection) handleTimeout() {
        // If this is the first timeout in a sequence, start the timer.
        /*      if m.connectionTimeoutTimer == nil {
                m.connectionTimeoutTimer = time.NewTimer(m.connectionTtl)
-               m.wg.Add(1)
-               go func() {
-                       defer m.wg.Done()
+                       m.wg.Go(func() {
                        <-m.connectionTimeoutTimer.C
                        m.resetConnection()
                }()
diff --git a/plc4go/internal/knxnetip/Discoverer.go 
b/plc4go/internal/knxnetip/Discoverer.go
index a6a98a7603..67568a4852 100644
--- a/plc4go/internal/knxnetip/Discoverer.go
+++ b/plc4go/internal/knxnetip/Discoverer.go
@@ -119,8 +119,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback 
func(event apiModel.
                if err != nil {
                        return err
                }
-               wg.Add(1)
-               go func(netInterface net.Interface) {
+               wg.Go(func() {
                        defer func() {
                                if err := recover(); err != nil {
                                        d.log.Error().
@@ -129,7 +128,6 @@ func (d *Discoverer) Discover(ctx context.Context, callback 
func(event apiModel.
                                                Msg("panic-ed")
                                }
                        }()
-                       defer func() { wg.Done() }()
                        // Iterate over all addresses the current interface has 
configured
                        // For KNX we're only interested in IPv4 addresses, as 
it doesn't
                        // seem to work with IPv6.
@@ -157,19 +155,15 @@ func (d *Discoverer) Discover(ctx context.Context, 
callback func(event apiModel.
                                }
                                d.transportInstanceCreationQueue.Submit(ctx, 
d.transportInstanceCreationWorkItemId.Add(1), 
d.createTransportInstanceDispatcher(ctx, wg, connectionUrl, ipv4Addr, 
udpTransport, transportInstances))
                        }
-               }(netInterface)
+               })
        }
-       d.wg.Add(1)
-       go func() {
-               defer d.wg.Done()
+       d.wg.Go(func() {
                wg.Wait()
                d.log.Trace().Msg("Closing transport instance channel")
                close(transportInstances)
-       }()
+       })
 
-       d.wg.Add(1)
-       go func() {
-               defer d.wg.Done()
+       d.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                d.log.Error().
@@ -185,13 +179,15 @@ func (d *Discoverer) Discover(ctx context.Context, 
callback func(event apiModel.
                        }
                        d.deviceScanningQueue.Submit(ctx, 
d.deviceScanningWorkItemId.Add(1), d.createDeviceScanDispatcher(ctx, 
transportInstance.(*udp.TransportInstance), callback))
                }
-       }()
+       })
        return nil
 }
 
 func (d *Discoverer) createTransportInstanceDispatcher(ctx context.Context, wg 
*sync.WaitGroup, connectionUrl *url.URL, ipv4Addr net.IP, udpTransport 
*udp.Transport, transportInstances chan transports.TransportInstance) 
pool.Runnable {
        wg.Add(1)
-       return func() {
+       return func(workerCtx context.Context) {
+               ctx, cancel := context.WithCancel(ctx)
+               context.AfterFunc(workerCtx, cancel)
                defer wg.Done()
                // Create a new "connection" (Actually open a local udp socket 
and target outgoing packets to that address)
                transportInstance, err :=
@@ -212,7 +208,9 @@ func (d *Discoverer) createTransportInstanceDispatcher(ctx 
context.Context, wg *
 }
 
 func (d *Discoverer) createDeviceScanDispatcher(ctx context.Context, 
udpTransportInstance *udp.TransportInstance, callback func(event 
apiModel.PlcDiscoveryItem)) pool.Runnable {
-       return func() {
+       return func(workerCtx context.Context) {
+               ctx, cancel := context.WithCancel(ctx)
+               context.AfterFunc(workerCtx, cancel)
                d.log.Debug().Stringer("udpTransportInstance", 
udpTransportInstance).Msg("Scanning")
                // Create a codec for sending and receiving messages.
                codec := NewMessageCodec(
diff --git a/plc4go/internal/knxnetip/Reader.go 
b/plc4go/internal/knxnetip/Reader.go
index 6a22617701..cbc2634694 100644
--- a/plc4go/internal/knxnetip/Reader.go
+++ b/plc4go/internal/knxnetip/Reader.go
@@ -58,9 +58,7 @@ func NewReader(connection *Connection, _options 
...options.WithOption) *Reader {
 func (m *Reader) Read(ctx context.Context, readRequest 
apiModel.PlcReadRequest) <-chan apiModel.PlcReadRequestResult {
        // TODO: handle ctx
        resultChan := make(chan apiModel.PlcReadRequestResult, 1)
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                resultChan <- 
spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, 
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -179,7 +177,7 @@ func (m *Reader) Read(ctx context.Context, readRequest 
apiModel.PlcReadRequest)
                        result,
                        nil,
                )
-       }()
+       })
        return resultChan
 }
 
diff --git a/plc4go/internal/knxnetip/Subscriber.go 
b/plc4go/internal/knxnetip/Subscriber.go
index 1ab0675939..19f3b64f96 100644
--- a/plc4go/internal/knxnetip/Subscriber.go
+++ b/plc4go/internal/knxnetip/Subscriber.go
@@ -64,9 +64,7 @@ func NewSubscriber(connection *Connection, _options 
...options.WithOption) *Subs
 func (s *Subscriber) Subscribe(ctx context.Context, subscriptionRequest 
apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
        // TODO: handle context
        result := make(chan apiModel.PlcSubscriptionRequestResult, 1)
-       s.wg.Add(1)
-       go func() {
-               defer s.wg.Done()
+       s.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                result <- 
spiModel.NewDefaultPlcSubscriptionRequestResult(subscriptionRequest, nil, 
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -96,7 +94,7 @@ func (s *Subscriber) Subscribe(ctx context.Context, 
subscriptionRequest apiModel
                        ),
                        nil,
                )
-       }()
+       })
        return result
 }
 
diff --git a/plc4go/internal/modbus/AsciiDriver.go 
b/plc4go/internal/modbus/AsciiDriver.go
index 068a4d307d..55eec7524c 100644
--- a/plc4go/internal/modbus/AsciiDriver.go
+++ b/plc4go/internal/modbus/AsciiDriver.go
@@ -93,9 +93,7 @@ func (d *AsciiDriver) GetConnectionWithContext(ctx 
context.Context, transportUrl
        // Create a new codec for taking care of encoding/decoding of messages
        // TODO: the code below looks strange: where is defaultChanel being 
used?
        defaultChanel := make(chan any)
-       d.wg.Add(1)
-       go func() {
-               defer d.wg.Done()
+       d.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                d.log.Error().
@@ -109,7 +107,7 @@ func (d *AsciiDriver) GetConnectionWithContext(ctx 
context.Context, transportUrl
                        adu := msg.(model.ModbusTcpADU)
                        d.log.Debug().Stringer("adu", adu).Msg("got message in 
the default handler %s\n")
                }
-       }()
+       })
        codec := NewMessageCodec(
                transportInstance,
                append(d._options, options.WithCustomLogger(d.log))...,
diff --git a/plc4go/internal/modbus/Connection.go 
b/plc4go/internal/modbus/Connection.go
index 4d73f9b43b..018956e5a4 100644
--- a/plc4go/internal/modbus/Connection.go
+++ b/plc4go/internal/modbus/Connection.go
@@ -110,9 +110,7 @@ func (c *Connection) Ping() <-chan 
plc4go.PlcConnectionPingResult {
        ctx := context.TODO()
        c.log.Trace().Msg("Pinging")
        result := make(chan plc4go.PlcConnectionPingResult, 1)
-       c.wg.Add(1)
-       go func() {
-               defer c.wg.Done()
+       c.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                result <- 
_default.NewDefaultPlcConnectionPingResult(errors.Errorf("panic-ed %v. Stack: 
%s", err, debug.Stack()))
@@ -149,7 +147,7 @@ func (c *Connection) Ping() <-chan 
plc4go.PlcConnectionPingResult {
                ); err != nil {
                        result <- 
_default.NewDefaultPlcConnectionPingResult(err)
                }
-       }()
+       })
        return result
 }
 
diff --git a/plc4go/internal/modbus/Reader.go b/plc4go/internal/modbus/Reader.go
index 7609baec06..0f9cb72ae5 100644
--- a/plc4go/internal/modbus/Reader.go
+++ b/plc4go/internal/modbus/Reader.go
@@ -65,9 +65,7 @@ func (m *Reader) Read(ctx context.Context, readRequest 
apiModel.PlcReadRequest)
        // TODO: handle ctx
        m.log.Trace().Msg("Reading")
        result := make(chan apiModel.PlcReadRequestResult, 1)
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                result <- 
spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, 
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -175,7 +173,7 @@ func (m *Reader) Read(ctx context.Context, readRequest 
apiModel.PlcReadRequest)
                                errors.Wrap(err, "error sending message"),
                        )
                }
-       }()
+       })
        return result
 }
 
diff --git a/plc4go/internal/modbus/RtuDriver.go 
b/plc4go/internal/modbus/RtuDriver.go
index 19b7cdc617..2a41b3113f 100644
--- a/plc4go/internal/modbus/RtuDriver.go
+++ b/plc4go/internal/modbus/RtuDriver.go
@@ -93,9 +93,7 @@ func (d *RtuDriver) GetConnectionWithContext(ctx 
context.Context, transportUrl u
        // Create a new codec for taking care of encoding/decoding of messages
        // TODO: the code below looks strange: where is defaultChanel being 
used?
        defaultChanel := make(chan any)
-       d.wg.Add(1)
-       go func() {
-               defer d.wg.Done()
+       d.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                d.log.Error().
@@ -109,7 +107,7 @@ func (d *RtuDriver) GetConnectionWithContext(ctx 
context.Context, transportUrl u
                        adu := msg.(model.ModbusTcpADU)
                        d.log.Debug().Stringer("adu", adu).Msg("got message in 
the default handler")
                }
-       }()
+       })
        codec := NewMessageCodec(
                transportInstance,
                append(d._options, options.WithCustomLogger(d.log))...,
diff --git a/plc4go/internal/modbus/TcpDriver.go 
b/plc4go/internal/modbus/TcpDriver.go
index 47c9ff3b0d..498e852d21 100644
--- a/plc4go/internal/modbus/TcpDriver.go
+++ b/plc4go/internal/modbus/TcpDriver.go
@@ -93,9 +93,7 @@ func (d *TcpDriver) GetConnectionWithContext(ctx 
context.Context, transportUrl u
        // Create a new codec for taking care of encoding/decoding of messages
        // TODO: the code below looks strange: where is defaultChanel being 
used?
        defaultChanel := make(chan any)
-       d.wg.Add(1)
-       go func() {
-               defer d.wg.Done()
+       d.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                d.log.Error().
@@ -109,7 +107,7 @@ func (d *TcpDriver) GetConnectionWithContext(ctx 
context.Context, transportUrl u
                        adu := msg.(model.ModbusTcpADU)
                        d.log.Debug().Stringer("adu", adu).Msg("got message in 
the default handler")
                }
-       }()
+       })
        codec := NewMessageCodec(
                transportInstance,
                append(d._options, options.WithCustomLogger(d.log))...,
diff --git a/plc4go/internal/modbus/Writer.go b/plc4go/internal/modbus/Writer.go
index 9ad3aa3b17..6309b7a97b 100644
--- a/plc4go/internal/modbus/Writer.go
+++ b/plc4go/internal/modbus/Writer.go
@@ -59,9 +59,7 @@ func NewWriter(unitIdentifier uint8, messageCodec 
spi.MessageCodec, _options ...
 func (m *Writer) Write(ctx context.Context, writeRequest 
apiModel.PlcWriteRequest) <-chan apiModel.PlcWriteRequestResult {
        // TODO: handle context
        result := make(chan apiModel.PlcWriteRequestResult, 1)
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                // If we are requesting only one tag, use a
                if len(writeRequest.GetTagNames()) != 1 {
                        result <- 
spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, errors.New("modbus 
only supports single-item requests"))
@@ -152,7 +150,7 @@ func (m *Writer) Write(ctx context.Context, writeRequest 
apiModel.PlcWriteReques
                        }
                        return nil
                }, time.Second*1)
-       }()
+       })
        return result
 }
 
diff --git a/plc4go/internal/opcua/Connection.go 
b/plc4go/internal/opcua/Connection.go
index 503e7a82c4..de7729b7cb 100644
--- a/plc4go/internal/opcua/Connection.go
+++ b/plc4go/internal/opcua/Connection.go
@@ -113,9 +113,7 @@ func (c *Connection) GetMessageCodec() spi.MessageCodec {
 func (c *Connection) ConnectWithContext(ctx context.Context) <-chan 
plc4go.PlcConnectionConnectResult {
        c.log.Trace().Msg("Connecting")
        ch := make(chan plc4go.PlcConnectionConnectResult, 1)
-       c.wg.Add(1)
-       go func() {
-               defer c.wg.Done()
+       c.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                c.fireConnectionError(errors.Errorf("panic-ed 
%v. Stack:\n%s", err, debug.Stack()), ch)
@@ -146,15 +144,13 @@ func (c *Connection) ConnectWithContext(ctx 
context.Context) <-chan plc4go.PlcCo
                }
 
                c.setupConnection(ctx, ch)
-       }()
+       })
        return ch
 }
 
 func (c *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
        results := make(chan plc4go.PlcConnectionCloseResult, 1)
-       c.wg.Add(1)
-       go func() {
-               defer c.wg.Done()
+       c.wg.Go(func() {
                result := <-c.DefaultConnection.Close()
                c.channel.onDisconnect(context.Background(), c)
                disconnectTimeout := time.NewTimer(c.disconnectTimeout)
@@ -165,7 +161,7 @@ func (c *Connection) Close() <-chan 
plc4go.PlcConnectionCloseResult {
                case <-disconnectTimeout.C:
                        results <- 
_default.NewDefaultPlcConnectionCloseResult(c, errors.Errorf("timeout after 
%s", c.disconnectTimeout))
                }
-       }()
+       })
        return results
 }
 
diff --git a/plc4go/internal/opcua/SecureChannel.go 
b/plc4go/internal/opcua/SecureChannel.go
index c24db8979a..7a7afc83ba 100644
--- a/plc4go/internal/opcua/SecureChannel.go
+++ b/plc4go/internal/opcua/SecureChannel.go
@@ -1273,11 +1273,7 @@ func (s *SecureChannel) keepAlive() {
                s.log.Warn().Msg("keepalive already running")
                return
        }
-       s.keepAliveWg.Add(1)
-       s.wg.Add(1)
-       go func() {
-               defer s.wg.Done()
-               defer s.keepAliveWg.Done()
+       s.keepAliveWg.Go(func() {
                s.keepAliveIndicator.Store(true)
                defer s.keepAliveIndicator.Store(false)
                defer s.log.Info().Msg("ending keepalive")
@@ -1424,7 +1420,7 @@ func (s *SecureChannel) keepAlive() {
                                s.log.Debug().Err(err).Msg("error submitting")
                        }
                }
-       }()
+       })
        return
 }
 
diff --git a/plc4go/internal/opcua/SubscriptionHandle.go 
b/plc4go/internal/opcua/SubscriptionHandle.go
index 6fea5c017b..24cb269a80 100644
--- a/plc4go/internal/opcua/SubscriptionHandle.go
+++ b/plc4go/internal/opcua/SubscriptionHandle.go
@@ -219,11 +219,7 @@ func (h *SubscriptionHandle) 
onSubscribeCreateMonitoredItemsRequest() (readWrite
 func (h *SubscriptionHandle) startSubscriber() {
        h.log.Trace().Msg("Starting Subscription")
 
-       h.subscriberWg.Add(1)
-       h.wg.Add(1)
-       go func() {
-               defer h.wg.Done()
-               defer h.subscriberWg.Done()
+       h.subscriberWg.Go(func() {
 
                var outstandingAcknowledgements 
[]readWriteModel.SubscriptionAcknowledgement
                var outstandingRequests []uint32
@@ -353,7 +349,7 @@ func (h *SubscriptionHandle) startSubscriber() {
                //Wait for any outstanding responses to arrive, using the 
request timeout length
                //sleep(this.revisedCycleTime * 10);
                h.complete = true
-       }()
+       })
 }
 
 // stopSubscriber stops the subscriber either on disconnect or on error
diff --git a/plc4go/internal/s7/Connection.go b/plc4go/internal/s7/Connection.go
index 7e1157a778..ad7a21c562 100644
--- a/plc4go/internal/s7/Connection.go
+++ b/plc4go/internal/s7/Connection.go
@@ -121,9 +121,7 @@ func (c *Connection) GetMessageCodec() spi.MessageCodec {
 func (c *Connection) ConnectWithContext(ctx context.Context) <-chan 
plc4go.PlcConnectionConnectResult {
        c.log.Trace().Msg("Connecting")
        ch := make(chan plc4go.PlcConnectionConnectResult, 1)
-       c.wg.Add(1)
-       go func() {
-               defer c.wg.Done()
+       c.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                ch <- 
_default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v. 
Stack: %s", err, debug.Stack()))
@@ -156,7 +154,7 @@ func (c *Connection) ConnectWithContext(ctx 
context.Context) <-chan plc4go.PlcCo
                c.log.Info().Msg("S7 Driver running in ACTIVE mode.")
 
                c.setupConnection(ctx, ch)
-       }()
+       })
        return ch
 }
 
diff --git a/plc4go/internal/s7/Reader.go b/plc4go/internal/s7/Reader.go
index d734353dd2..70a3603417 100644
--- a/plc4go/internal/s7/Reader.go
+++ b/plc4go/internal/s7/Reader.go
@@ -65,9 +65,7 @@ func (m *Reader) Read(ctx context.Context, readRequest 
apiModel.PlcReadRequest)
        // TODO: handle ctx
        m.log.Trace().Msg("Reading")
        result := make(chan apiModel.PlcReadRequestResult, 1)
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                result <- 
spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, 
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -116,7 +114,9 @@ func (m *Reader) Read(ctx context.Context, readRequest 
apiModel.PlcReadRequest)
                )
                // Start a new request-transaction (Is ended in the 
response-handler)
                transaction := m.tm.StartTransaction()
-               transaction.Submit(func(transaction 
transactions.RequestTransaction) {
+               transaction.Submit(func(transactionContext context.Context, 
transaction transactions.RequestTransaction) {
+                       ctx, cancel := context.WithCancel(ctx)
+                       context.AfterFunc(transactionContext, cancel)
 
                        // Send the  over the wire
                        m.log.Trace().Msg("Send ")
@@ -176,7 +176,7 @@ func (m *Reader) Read(ctx context.Context, readRequest 
apiModel.PlcReadRequest)
                                }
                        }
                })
-       }()
+       })
        return result
 }
 
diff --git a/plc4go/internal/s7/Writer.go b/plc4go/internal/s7/Writer.go
index 667fd6cab9..6c3e306ed2 100644
--- a/plc4go/internal/s7/Writer.go
+++ b/plc4go/internal/s7/Writer.go
@@ -60,9 +60,7 @@ func NewWriter(tpduGenerator *TpduGenerator, messageCodec 
spi.MessageCodec, tm t
 func (m *Writer) Write(ctx context.Context, writeRequest 
apiModel.PlcWriteRequest) <-chan apiModel.PlcWriteRequestResult {
        // TODO: handle context
        result := make(chan apiModel.PlcWriteRequestResult, 1)
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                result <- 
spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, 
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -109,7 +107,9 @@ func (m *Writer) Write(ctx context.Context, writeRequest 
apiModel.PlcWriteReques
 
                // Start a new request-transaction (Is ended in the 
response-handler)
                transaction := m.tm.StartTransaction()
-               transaction.Submit(func(transaction 
transactions.RequestTransaction) {
+               transaction.Submit(func(transactionContext context.Context, 
transaction transactions.RequestTransaction) {
+                       ctx, cancel := context.WithCancel(ctx)
+                       context.AfterFunc(transactionContext, cancel)
                        // Send the  over the wire
                        if err := m.messageCodec.SendRequest(ctx, tpktPacket, 
func(message spi.Message) bool {
                                tpktPacket, ok := 
message.(readWriteModel.TPKTPacket)
@@ -160,7 +160,7 @@ func (m *Writer) Write(ctx context.Context, writeRequest 
apiModel.PlcWriteReques
                                }
                        }
                })
-       }()
+       })
        return result
 }
 
diff --git a/plc4go/internal/simulated/Connection.go 
b/plc4go/internal/simulated/Connection.go
index 8d1b558835..5915360b86 100644
--- a/plc4go/internal/simulated/Connection.go
+++ b/plc4go/internal/simulated/Connection.go
@@ -91,9 +91,7 @@ func (c *Connection) Connect() <-chan 
plc4go.PlcConnectionConnectResult {
 
 func (c *Connection) ConnectWithContext(_ context.Context) <-chan 
plc4go.PlcConnectionConnectResult {
        ch := make(chan plc4go.PlcConnectionConnectResult, 1)
-       c.wg.Add(1)
-       go func() {
-               defer c.wg.Done()
+       c.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                ch <- 
_default.NewDefaultPlcConnectionCloseResult(nil, errors.Errorf("panic-ed %v. 
Stack: %s", err, debug.Stack()))
@@ -139,7 +137,7 @@ func (c *Connection) ConnectWithContext(_ context.Context) 
<-chan plc4go.PlcConn
                        // Return the connection in a connected state to the 
user.
                        ch <- _default.NewDefaultPlcConnectionConnectResult(c, 
nil)
                }
-       }()
+       })
        return ch
 }
 
@@ -149,9 +147,7 @@ func (c *Connection) BlockingClose() {
 
 func (c *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
        ch := make(chan plc4go.PlcConnectionCloseResult, 1)
-       c.wg.Add(1)
-       go func() {
-               defer c.wg.Done()
+       c.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                ch <- 
_default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v. 
Stack: %s", err, debug.Stack()))
@@ -187,7 +183,7 @@ func (c *Connection) Close() <-chan 
plc4go.PlcConnectionCloseResult {
                }
                // Return a new connection to the user.
                ch <- _default.NewDefaultPlcConnectionCloseResult(c, nil)
-       }()
+       })
        return ch
 }
 
@@ -197,9 +193,7 @@ func (c *Connection) IsConnected() bool {
 
 func (c *Connection) Ping() <-chan plc4go.PlcConnectionPingResult {
        ch := make(chan plc4go.PlcConnectionPingResult, 1)
-       c.wg.Add(1)
-       go func() {
-               defer c.wg.Done()
+       c.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                ch <- 
_default.NewDefaultPlcConnectionPingResult(errors.Errorf("panic-ed %v. Stack: 
%s", err, debug.Stack()))
@@ -242,7 +236,7 @@ func (c *Connection) Ping() <-chan 
plc4go.PlcConnectionPingResult {
                        }
                        ch <- _default.NewDefaultPlcConnectionPingResult(nil)
                }
-       }()
+       })
        return ch
 }
 
diff --git a/plc4go/internal/simulated/Connection_test.go 
b/plc4go/internal/simulated/Connection_test.go
index 9b26161ef1..41b42dff52 100644
--- a/plc4go/internal/simulated/Connection_test.go
+++ b/plc4go/internal/simulated/Connection_test.go
@@ -20,6 +20,7 @@
 package simulated
 
 import (
+       "sync"
        "testing"
        "time"
 
@@ -328,10 +329,12 @@ func TestConnection_BlockingClose(t *testing.T) {
                        timeBeforeClose := time.Now()
                        executor := func() <-chan bool {
                                ch := make(chan bool)
-                               go func() {
+                               var wg sync.WaitGroup
+                               t.Cleanup(wg.Wait)
+                               wg.Go(func() {
                                        c.BlockingClose()
                                        ch <- true
-                               }()
+                               })
                                return ch
                        }
                        timeout := time.NewTimer(3 * time.Second)
diff --git a/plc4go/internal/simulated/Reader.go 
b/plc4go/internal/simulated/Reader.go
index 4650a6f911..d44815fc03 100644
--- a/plc4go/internal/simulated/Reader.go
+++ b/plc4go/internal/simulated/Reader.go
@@ -59,9 +59,7 @@ func NewReader(device *Device, readerOptions 
map[string][]string, tracer tracer.
 
 func (r *Reader) Read(_ context.Context, readRequest apiModel.PlcReadRequest) 
<-chan apiModel.PlcReadRequestResult {
        ch := make(chan apiModel.PlcReadRequestResult, 1)
-       r.wg.Add(1)
-       go func() {
-               defer r.wg.Done()
+       r.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                ch <- 
spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, 
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -111,6 +109,6 @@ func (r *Reader) Read(_ context.Context, readRequest 
apiModel.PlcReadRequest) <-
                        spiModel.NewDefaultPlcReadResponse(readRequest, 
responseCodes, responseValues),
                        nil,
                )
-       }()
+       })
        return ch
 }
diff --git a/plc4go/internal/simulated/Writer.go 
b/plc4go/internal/simulated/Writer.go
index e09e8bca17..4af1a3fe08 100644
--- a/plc4go/internal/simulated/Writer.go
+++ b/plc4go/internal/simulated/Writer.go
@@ -58,9 +58,7 @@ func NewWriter(device *Device, writerOptions 
map[string][]string, tracer tracer.
 
 func (w *Writer) Write(_ context.Context, writeRequest 
apiModel.PlcWriteRequest) <-chan apiModel.PlcWriteRequestResult {
        ch := make(chan apiModel.PlcWriteRequestResult, 1)
-       w.wg.Add(1)
-       go func() {
-               defer w.wg.Done()
+       w.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                ch <- 
spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, 
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -99,6 +97,6 @@ func (w *Writer) Write(_ context.Context, writeRequest 
apiModel.PlcWriteRequest)
                }
                // Emit the response
                ch <- spiModel.NewDefaultPlcWriteRequestResult(writeRequest, 
spiModel.NewDefaultPlcWriteResponse(writeRequest, responseCodes), nil)
-       }()
+       })
        return ch
 }
diff --git a/plc4go/pkg/api/cache/PlcConnectionCache.go 
b/plc4go/pkg/api/cache/PlcConnectionCache.go
index 6872891bef..fc1c0ba616 100644
--- a/plc4go/pkg/api/cache/PlcConnectionCache.go
+++ b/plc4go/pkg/api/cache/PlcConnectionCache.go
@@ -162,9 +162,7 @@ func (c *plcConnectionCache) GetConnection(connectionString 
string) <-chan plc4g
 func (c *plcConnectionCache) GetConnectionWithContext(ctx context.Context, 
connectionString string) <-chan plc4go.PlcConnectionConnectResult {
        ch := make(chan plc4go.PlcConnectionConnectResult)
 
-       c.wg.Add(1)
-       go func() {
-               defer c.wg.Done()
+       c.wg.Go(func() {
                c.cacheLock.Lock()
 
                // If a connection for this connection string didn't exist yet, 
create a new container
@@ -181,9 +179,9 @@ func (c *plcConnectionCache) GetConnectionWithContext(ctx 
context.Context, conne
                        // Store the new connection container in the cache of 
connections.
                        c.connections[connectionString] = cc
                        // Initialize the connection itself.
-                       go func(cc2 *connectionContainer) {
-                               cc2.connect(ctx)
-                       }(cc)
+                       c.wg.Go(func() {
+                               cc.connect(ctx)
+                       })
                }
 
                // Get the ConnectionContainer for this connection string.
@@ -229,19 +227,17 @@ func (c *plcConnectionCache) GetConnectionWithContext(ctx 
context.Context, conne
 
                case <-maximumWaitTimeout.C: // Timeout after the maximum 
waiting time.
                        // In this case we need to drain the chan and return it 
immediate
-                       c.wg.Add(1)
-                       go func() {
-                               defer c.wg.Done()
+                       c.wg.Go(func() {
                                <-leaseChan
                                _ = connection.returnConnection(ctx, StateIdle)
-                       }()
+                       })
                        if c.tracer != nil {
                                c.tracer.AddTransactionalTrace(txId, 
"get-connection", "timeout")
                        }
                        c.log.Debug().Str("connectionString", 
connectionString).Msg("Timeout while waiting for connection.")
                        ch <- _default.NewDefaultPlcConnectionCloseResult(nil, 
errors.New("timeout while waiting for connection"))
                }
-       }()
+       })
 
        return ch
 }
@@ -250,9 +246,7 @@ func (c *plcConnectionCache) Close() <-chan 
PlcConnectionCacheCloseResult {
        c.log.Debug().Msg("Closing connection cache started.")
        ch := make(chan PlcConnectionCacheCloseResult)
 
-       c.wg.Add(1)
-       go func() {
-               defer c.wg.Done()
+       c.wg.Go(func() {
                c.log.Trace().Msg("Acquire lock")
                c.cacheLock.Lock()
                defer c.cacheLock.Unlock()
@@ -268,16 +262,16 @@ func (c *plcConnectionCache) Close() <-chan 
PlcConnectionCacheCloseResult {
                        return
                }
 
-               for _, cc := range c.connections {
-                       ccLog := c.log.With().Stringer("cc", cc).Logger()
+               for _, connectionContainer := range c.connections {
+                       ccLog := c.log.With().Stringer("connectionContainer", 
connectionContainer).Logger()
                        ccLog.Trace().Msg("Closing connection")
                        // Mark the connection as being closed to not try to 
re-establish it.
-                       cc.closed = true
+                       connectionContainer.closed = true
                        // Try to get a lease as this way we kow we're not 
closing the connection
                        // while some go func is still using it.
-                       go func(container *connectionContainer) {
+                       c.wg.Go(func() {
                                ccLog.Trace().Msg("getting a lease")
-                               leaseResults := container.lease()
+                               leaseResults := connectionContainer.lease()
                                closeTimeout := time.NewTimer(c.maxWaitTime)
                                select {
                                // We're just getting the lease as this way we 
can be sure nobody else is using it.
@@ -286,16 +280,16 @@ func (c *plcConnectionCache) Close() <-chan 
PlcConnectionCacheCloseResult {
                                case _ = <-leaseResults:
                                        ccLog.Debug().Msg("Gracefully closing 
connection ...")
                                        // Give back the connection.
-                                       if container.connection != nil {
+                                       if connectionContainer.connection != 
nil {
                                                ccLog.Trace().Msg("closing 
actual connection")
-                                               container.connection.Close()
+                                               
connectionContainer.connection.Close()
                                        }
                                // If we're timing out brutally kill the 
connection.
                                case <-closeTimeout.C:
                                        ccLog.Debug().Msg("Forcefully closing 
connection ...")
                                        // Forcefully close this connection.
-                                       if container.connection != nil {
-                                               container.connection.Close()
+                                       if connectionContainer.connection != 
nil {
+                                               
connectionContainer.connection.Close()
                                        }
                                }
 
@@ -306,9 +300,9 @@ func (c *plcConnectionCache) Close() <-chan 
PlcConnectionCacheCloseResult {
                                case <-responseDeliveryTimeout.C:
                                }
                                c.log.Debug().Msg("Closing connection cache 
finished.")
-                       }(cc)
+                       })
                }
-       }()
+       })
 
        return ch
 }
diff --git a/plc4go/pkg/api/cache/PlcConnectionCache_test.go 
b/plc4go/pkg/api/cache/PlcConnectionCache_test.go
index d0f7ac1ff9..0013453a2e 100644
--- a/plc4go/pkg/api/cache/PlcConnectionCache_test.go
+++ b/plc4go/pkg/api/cache/PlcConnectionCache_test.go
@@ -20,6 +20,7 @@
 package cache
 
 import (
+       "sync"
        "testing"
        "time"
 
@@ -230,11 +231,9 @@ func (c *plcConnectionCache) readFromPlc(t *testing.T, 
preConnectJob func(), con
                        closeResults := connection.Close()
                        // Wait for the connection to be correctly closed.
                        closeResult := <-closeResults
-                       c.wg.Add(1)
-                       go func() {
-                               defer c.wg.Done()
+                       c.wg.Go(func() {
                                ch <- 
(closeResult.(_default.DefaultPlcConnectionCloseResult)).GetTraces()
-                       }()
+                       })
                }()
 
                // Prepare a read request.
@@ -264,9 +263,7 @@ func (c *plcConnectionCache) readFromPlc(t *testing.T, 
preConnectJob func(), con
 
 func (c *plcConnectionCache) executeAndTestReadFromPlc(t *testing.T, 
preConnectJob func(), connectionString string, resourceString string, 
expectedTraceEntries []string, expectedNumTotalConnections int) <-chan struct{} 
{
        ch := make(chan struct{})
-       c.wg.Add(1)
-       go func() {
-               defer c.wg.Done()
+       c.wg.Go(func() {
                // Read once from the c.
                traces := <-c.readFromPlc(t, preConnectJob, connectionString, 
resourceString)
 
@@ -287,7 +284,7 @@ func (c *plcConnectionCache) executeAndTestReadFromPlc(t 
*testing.T, preConnectJ
                        t.Errorf("Expected %d connections in the c but got %d", 
expectedNumTotalConnections, len(c.connections))
                }
                ch <- struct{}{}
-       }()
+       })
        return ch
 }
 
@@ -875,8 +872,10 @@ func TestPlcConnectionCache_MaximumWaitTimeReached(t 
*testing.T) {
        // The third connection should be given up by the cache
        thirdConnectionResults := 
cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&pingDelay=4000&traceEnabled=true")
 
+       var wg sync.WaitGroup
+       t.Cleanup(wg.Wait)
        // Just make sure the first two connections are returned as soon as 
they are received
-       go func() {
+       wg.Go(func() {
                select {
                case connectionResult := <-firstConnectionResults:
                        if assert.NotNil(t, connectionResult) {
@@ -888,8 +887,8 @@ func TestPlcConnectionCache_MaximumWaitTimeReached(t 
*testing.T) {
                case <-time.After(5 * time.Second):
                        t.Errorf("Timeout")
                }
-       }()
-       go func() {
+       })
+       wg.Go(func() {
                select {
                case connectionResult := <-secondConnectionResults:
                        if assert.NotNil(t, connectionResult) {
@@ -901,7 +900,7 @@ func TestPlcConnectionCache_MaximumWaitTimeReached(t 
*testing.T) {
                case <-time.After(5 * time.Second):
                        t.Errorf("Timeout")
                }
-       }()
+       })
 
        // Now wait for the last connection to be timed out by the cache
        select {
diff --git a/plc4go/pkg/api/cache/connectionContainer.go 
b/plc4go/pkg/api/cache/connectionContainer.go
index 92d73b4a24..a203d631af 100644
--- a/plc4go/pkg/api/cache/connectionContainer.go
+++ b/plc4go/pkg/api/cache/connectionContainer.go
@@ -162,11 +162,9 @@ func (c *connectionContainer) lease() <-chan 
plc4go.PlcConnectionConnectResult {
                // is definitely eagerly waiting for input.
                c.log.Debug().Str("connectionString", c.connectionString).
                        Msg("Got lease instantly as connection was idle.")
-               c.wg.Add(1)
-               go func() {
-                       defer c.wg.Done()
+               c.wg.Go(func() {
                        ch <- 
_default.NewDefaultPlcConnectionConnectResult(connection, nil)
-               }()
+               })
        case StateInUse, StateInitialized:
                // If the connection is currently busy or not finished 
initializing,
                // add the new channel to the queue for this connection.
@@ -213,9 +211,7 @@ func (c *connectionContainer) returnConnection(ctx 
context.Context, newState cac
                // Send asynchronously as the receiver might have given up 
waiting,
                // and we don'c want anything to block here. 1ms should be 
enough for
                // the calling process to reach the blocking read.
-               c.wg.Add(1)
-               go func() {
-                       defer c.wg.Done()
+               c.wg.Go(func() {
                        // In this case we don'c need to check for blocks
                        // as the getConnection function of the connection cache
                        // is definitely eagerly waiting for input.
@@ -223,7 +219,7 @@ func (c *connectionContainer) returnConnection(ctx 
context.Context, newState cac
                        c.log.Debug().Str("connectionString", 
c.connectionString).
                                Int("waiting-queue-size", len(c.queue)).
                                Msg("Returned connection to the next client 
waiting.")
-               }()
+               })
        } else {
                // Otherwise, just mark the connection as idle.
                c.log.Debug().Str("connectionString", c.connectionString).
diff --git a/plc4go/pkg/api/cache/plcConnectionLease.go 
b/plc4go/pkg/api/cache/plcConnectionLease.go
index aef78f60a8..e2860442c9 100644
--- a/plc4go/pkg/api/cache/plcConnectionLease.go
+++ b/plc4go/pkg/api/cache/plcConnectionLease.go
@@ -98,9 +98,7 @@ func (t *plcConnectionLease) Close() <-chan 
plc4go.PlcConnectionCloseResult {
 
        result := make(chan plc4go.PlcConnectionCloseResult, 1)
 
-       t.wg.Add(1)
-       go func() {
-               defer t.wg.Done()
+       t.wg.Go(func() {
                // Check if the connection is still alive, if it is, put it 
back into the cache
                pingResults := t.Ping()
                pingTimeout := time.NewTimer(5 * time.Second)
@@ -143,7 +141,7 @@ func (t *plcConnectionLease) Close() <-chan 
plc4go.PlcConnectionCloseResult {
 
                // Finish closing the connection.
                result <- 
_default.NewDefaultPlcConnectionCloseResultWithTraces(t, err, traces)
-       }()
+       })
 
        return result
 }
diff --git a/plc4go/spi/default/DefaultBrowser.go 
b/plc4go/spi/default/DefaultBrowser.go
index d7d6c73300..69cf6722b3 100644
--- a/plc4go/spi/default/DefaultBrowser.go
+++ b/plc4go/spi/default/DefaultBrowser.go
@@ -78,9 +78,7 @@ func (m *defaultBrowser) Browse(ctx context.Context, 
browseRequest apiModel.PlcB
 
 func (m *defaultBrowser) BrowseWithInterceptor(ctx context.Context, 
browseRequest apiModel.PlcBrowseRequest, interceptor func(result 
apiModel.PlcBrowseItem) bool) <-chan apiModel.PlcBrowseRequestResult {
        result := make(chan apiModel.PlcBrowseRequestResult, 1)
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                m.log.Error().
@@ -101,6 +99,6 @@ func (m *defaultBrowser) BrowseWithInterceptor(ctx 
context.Context, browseReques
                        browseResponse,
                        nil,
                )
-       }()
+       })
        return result
 }
diff --git a/plc4go/spi/default/DefaultCodec.go 
b/plc4go/spi/default/DefaultCodec.go
index b4fc71dae8..498695a483 100644
--- a/plc4go/spi/default/DefaultCodec.go
+++ b/plc4go/spi/default/DefaultCodec.go
@@ -215,21 +215,21 @@ func (m *defaultCodec) TimeoutExpectations(now time.Time) 
{
                        // Remove this expectation from the list.
                        m.log.Debug().Stringer("expectation", 
expectation).Msg("timeout expectation")
                        // Call the error handler.
-                       go func(expectation spi.Expectation) {
+                       m.wg.Go(func() {
                                if err := 
expectation.GetHandleError()(utils.NewTimeoutError(expectation.GetExpiration().Sub(expectation.GetCreationTime())));
 err != nil {
                                        m.log.Error().Err(err).Msg("Got an 
error handling error on expectation")
                                }
-                       }(expectation)
+                       })
                        return true
                }
                if err := expectation.GetContext().Err(); err != nil {
                        m.log.Debug().Err(err).Stringer("expectation", 
expectation).Msg("expectation canceled")
                        // Remove this expectation from the list.
-                       go func(expectation spi.Expectation) {
+                       m.wg.Go(func() {
                                if err := expectation.GetHandleError()(err); 
err != nil {
                                        m.log.Error().Err(err).Msg("Got an 
error handling error on expectation")
                                }
-                       }(expectation)
+                       })
                        return true
                }
                return false
@@ -252,11 +252,11 @@ func (m *defaultCodec) HandleMessages(message 
spi.Message) bool {
                        if err := expectation.GetHandleMessage()(message); err 
!= nil {
                                expectationLog.Debug().Err(err).Msg("errored 
handling the message")
                                // Pass the error to the error handler.
-                               go func(expectation spi.Expectation) {
+                               m.wg.Go(func() {
                                        if err := 
expectation.GetHandleError()(err); err != nil {
                                                m.log.Error().Err(err).Msg("Got 
an error handling error on expectation")
                                        }
-                               }(expectation)
+                               })
                                return false
                        }
                        m.log.Trace().Msg("message handled")
@@ -273,12 +273,10 @@ func (m *defaultCodec) HandleMessages(message 
spi.Message) bool {
 
 func (m *defaultCodec) startWorker() {
        m.log.Trace().Msg("starting worker")
-       m.activeWorker.Add(1)
-       go m.Work()
+       m.activeWorker.Go(m.Work)
 }
 
 func (m *defaultCodec) Work() {
-       defer m.activeWorker.Done()
        workerLog := m.log.With().Logger()
        if !m.traceDefaultMessageCodecWorker {
                workerLog = zerolog.Nop()
@@ -335,16 +333,14 @@ mainLoop:
                var err error
                {
                        syncer := make(chan struct{})
-                       m.wg.Add(1)
-                       go func() {
-                               defer m.wg.Done()
+                       m.wg.Go(func() {
                                defer close(syncer)
                                if !m.running.Load() {
                                        err = errors.New("not running")
                                        return
                                }
                                message, err = m.Receive()
-                       }()
+                       })
                        timeoutTimer := time.NewTimer(m.receiveTimeout)
                        select {
                        case <-syncer:
diff --git a/plc4go/spi/default/DefaultCodec_test.go 
b/plc4go/spi/default/DefaultCodec_test.go
index c0ba8ea0c7..074901ab9a 100644
--- a/plc4go/spi/default/DefaultCodec_test.go
+++ b/plc4go/spi/default/DefaultCodec_test.go
@@ -23,6 +23,7 @@ import (
        "context"
        "fmt"
        "os"
+       "sync"
        "sync/atomic"
        "testing"
        "time"
@@ -1440,11 +1441,13 @@ func Test_defaultCodec_Work(t *testing.T) {
                        if tt.manipulator != nil {
                                tt.manipulator(t, m)
                        }
-                       go func() {
+                       var wg sync.WaitGroup
+                       t.Cleanup(wg.Wait)
+                       wg.Go(func() {
                                // Stop after 200ms
                                time.Sleep(200 * time.Millisecond)
                                m.running.Store(false)
-                       }()
+                       })
                        m.Work()
                })
        }
diff --git a/plc4go/spi/default/DefaultConnection.go 
b/plc4go/spi/default/DefaultConnection.go
index e55573ca94..00a823d843 100644
--- a/plc4go/spi/default/DefaultConnection.go
+++ b/plc4go/spi/default/DefaultConnection.go
@@ -152,9 +152,7 @@ func (d *defaultConnection) Connect() <-chan 
plc4go.PlcConnectionConnectResult {
 func (d *defaultConnection) ConnectWithContext(ctx context.Context) <-chan 
plc4go.PlcConnectionConnectResult {
        d.log.Trace().Msg("Connecting")
        ch := make(chan plc4go.PlcConnectionConnectResult, 1)
-       d.wg.Add(1)
-       go func() {
-               defer d.wg.Done()
+       d.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                ch <- NewDefaultPlcConnectionConnectResult(nil, 
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -164,7 +162,7 @@ func (d *defaultConnection) ConnectWithContext(ctx 
context.Context) <-chan plc4g
                d.SetConnected(true)
                connection := d.GetConnection()
                ch <- NewDefaultPlcConnectionConnectResult(connection, err)
-       }()
+       })
        return ch
 }
 
@@ -217,9 +215,7 @@ func (d *defaultConnection) IsConnected() bool {
 
 func (d *defaultConnection) Ping() <-chan plc4go.PlcConnectionPingResult {
        ch := make(chan plc4go.PlcConnectionPingResult, 1)
-       d.wg.Add(1)
-       go func() {
-               defer d.wg.Done()
+       d.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                ch <- 
NewDefaultPlcConnectionPingResult(errors.Errorf("panic-ed %v. Stack: %s", err, 
debug.Stack()))
@@ -230,7 +226,7 @@ func (d *defaultConnection) Ping() <-chan 
plc4go.PlcConnectionPingResult {
                } else {
                        ch <- NewDefaultPlcConnectionPingResult(errors.New("not 
connected"))
                }
-       }()
+       })
        return ch
 }
 
diff --git a/plc4go/spi/model/DefaultPlcReadRequest.go 
b/plc4go/spi/model/DefaultPlcReadRequest.go
index de5c4a2109..09e6a2b0fd 100644
--- a/plc4go/spi/model/DefaultPlcReadRequest.go
+++ b/plc4go/spi/model/DefaultPlcReadRequest.go
@@ -150,9 +150,7 @@ func (d *DefaultPlcReadRequest) 
ExecuteWithContextAndInterceptor(ctx context.Con
 
        // Create a new result-channel, which completes as soon as all 
sub-result-channels have returned
        resultChannel := make(chan apiModel.PlcReadRequestResult, 1)
-       d.wg.Add(1)
-       go func() {
-               defer d.wg.Done()
+       d.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                resultChannel <- 
NewDefaultPlcReadRequestResult(d, nil, errors.Errorf("panic-ed %v. Stack: %s", 
err, debug.Stack()))
@@ -173,6 +171,6 @@ func (d *DefaultPlcReadRequest) 
ExecuteWithContextAndInterceptor(ctx context.Con
                result := d.readRequestInterceptor.ProcessReadResponses(ctx, d, 
subResults)
                // Return the final result
                resultChannel <- result
-       }()
+       })
        return resultChannel
 }
diff --git a/plc4go/spi/model/DefaultPlcUnsubscriptionRequest.go 
b/plc4go/spi/model/DefaultPlcUnsubscriptionRequest.go
index 6b9e570318..8450551d4d 100644
--- a/plc4go/spi/model/DefaultPlcUnsubscriptionRequest.go
+++ b/plc4go/spi/model/DefaultPlcUnsubscriptionRequest.go
@@ -70,9 +70,7 @@ func (d *DefaultPlcUnsubscriptionRequest) Execute() <-chan 
apiModel.PlcUnsubscri
 
 func (d *DefaultPlcUnsubscriptionRequest) ExecuteWithContext(ctx 
context.Context) <-chan apiModel.PlcUnsubscriptionRequestResult {
        results := make(chan apiModel.PlcUnsubscriptionRequestResult, 1)
-       d.wg.Add(1)
-       go func() {
-               defer d.wg.Done()
+       d.wg.Go(func() {
                var collectedErrors []error
                for _, handle := range d.subscriptionHandles {
                        select {
@@ -90,7 +88,7 @@ func (d *DefaultPlcUnsubscriptionRequest) 
ExecuteWithContext(ctx context.Context
                        finalErr = errors.Wrap(err, "error unsubscribing from 
all")
                }
                results <- NewDefaultPlcUnsubscriptionRequestResult(d, 
NewDefaultPlcUnsubscriptionResponse(d), finalErr)
-       }()
+       })
        return results
 }
 
diff --git a/plc4go/spi/model/DefaultPlcWriteRequest.go 
b/plc4go/spi/model/DefaultPlcWriteRequest.go
index dc7c2e7d84..c99cd6776d 100644
--- a/plc4go/spi/model/DefaultPlcWriteRequest.go
+++ b/plc4go/spi/model/DefaultPlcWriteRequest.go
@@ -170,9 +170,7 @@ func (d *DefaultPlcWriteRequest) 
ExecuteWithContextAndInterceptor(ctx context.Co
 
        // Create a new result-channel, which completes as soon as all 
sub-result-channels have returned
        resultChannel := make(chan apiModel.PlcWriteRequestResult, 1)
-       d.wg.Add(1)
-       go func() {
-               defer d.wg.Done()
+       d.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                resultChannel <- 
NewDefaultPlcWriteRequestResult(d, nil, errors.Errorf("panic-ed %v. Stack: %s", 
err, debug.Stack()))
@@ -193,7 +191,7 @@ func (d *DefaultPlcWriteRequest) 
ExecuteWithContextAndInterceptor(ctx context.Co
                result := d.writeRequestInterceptor.ProcessWriteResponses(ctx, 
d, subResults)
                // Return the final result
                resultChannel <- result
-       }()
+       })
        return resultChannel
 }
 
diff --git a/plc4go/spi/pool/WorkerPool.go b/plc4go/spi/pool/WorkerPool.go
index 7611ce242e..8bbc1c0149 100644
--- a/plc4go/spi/pool/WorkerPool.go
+++ b/plc4go/spi/pool/WorkerPool.go
@@ -27,7 +27,7 @@ import (
        "github.com/apache/plc4x/plc4go/spi/options"
 )
 
-type Runnable func()
+type Runnable func(ctx context.Context)
 
 type CompletionFuture interface {
        AwaitCompletion(ctx context.Context) error
diff --git a/plc4go/spi/pool/WorkerPool_test.go 
b/plc4go/spi/pool/WorkerPool_test.go
index 5436391537..19f4b61d42 100644
--- a/plc4go/spi/pool/WorkerPool_test.go
+++ b/plc4go/spi/pool/WorkerPool_test.go
@@ -20,7 +20,9 @@
 package pool
 
 import (
+       "context"
        "math/rand"
+       "sync"
        "testing"
        "time"
 
@@ -137,21 +139,29 @@ func TestNewDynamicExecutor(t *testing.T) {
                                        timeToBecomeUnused = 100 * 
time.Millisecond
                                }
                                t.Log("fill some jobs")
-                               go func() {
+                               var wg sync.WaitGroup
+                               t.Cleanup(wg.Wait)
+                               wg.Go(func() {
                                        for i := 0; i < 500; i++ {
                                                e.workItems <- workItem{
                                                        workItemId: int32(i),
-                                                       runnable: func() {
+                                                       runnable: 
func(runnableCtx context.Context) {
+                                                               ctx, cancel := 
context.WithCancel(t.Context())
+                                                               
context.AfterFunc(runnableCtx, cancel)
                                                                max := 100
                                                                min := 10
                                                                sleepTime := 
time.Duration(rand.Intn(max-min)+min) * time.Millisecond
                                                                
t.Logf("Sleeping for %v", sleepTime)
-                                                               
time.Sleep(sleepTime)
+                                                               timer := 
time.NewTimer(sleepTime)
+                                                               select {
+                                                               case <-timer.C:
+                                                               case 
<-ctx.Done():
+                                                               }
                                                        },
                                                        completionFuture: 
&future{},
                                                }
                                        }
-                               }()
+                               })
                        },
                        executorValidator: func(t *testing.T, e 
*dynamicExecutor) bool {
                                time.Sleep(500 * time.Millisecond)
diff --git a/plc4go/spi/pool/dynamicExecutor.go 
b/plc4go/spi/pool/dynamicExecutor.go
index 748f49f0eb..f3796d6217 100644
--- a/plc4go/spi/pool/dynamicExecutor.go
+++ b/plc4go/spi/pool/dynamicExecutor.go
@@ -44,8 +44,6 @@ type dynamicExecutor struct {
        interrupter            chan struct{}
 
        wg sync.WaitGroup // use to track spawned go routines
-
-       dynamicWorkers sync.WaitGroup
 }
 
 func newDynamicExecutor(queueDepth, maxNumberOfWorkers int, log 
zerolog.Logger) *dynamicExecutor {
@@ -65,18 +63,14 @@ func (e *dynamicExecutor) Start() {
        if e.interrupter != nil {
                e.log.Debug().Msg("Ensuring that the old spawner/killers are 
not running")
                close(e.interrupter)
-               e.dynamicWorkers.Wait()
+               e.wg.Wait()
        }
 
        e.executor.Start()
        mutex := sync.Mutex{}
        e.interrupter = make(chan struct{})
        // Worker spawner
-       e.wg.Add(1)
-       go func() {
-               defer e.wg.Done()
-               e.dynamicWorkers.Add(1)
-               defer e.dynamicWorkers.Done()
+       e.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                e.log.Error().
@@ -123,13 +117,9 @@ func (e *dynamicExecutor) Start() {
                        }()
                }
                workerLog.Info().Msg("Terminated")
-       }()
+       })
        // Worker killer
-       e.dynamicWorkers.Add(1)
-       e.wg.Add(1)
-       go func() {
-               defer e.wg.Done()
-               defer e.dynamicWorkers.Done()
+       e.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                e.log.Error().
@@ -181,7 +171,7 @@ func (e *dynamicExecutor) Start() {
                        }()
                }
                workerLog.Info().Msg("Terminated")
-       }()
+       })
 }
 
 func (e *dynamicExecutor) Stop() {
@@ -199,7 +189,7 @@ func (e *dynamicExecutor) Stop() {
        e.log.Debug().
                Interface("currentNumberOfWorkers", 
e.currentNumberOfWorkers.Load()).
                Msg("waiting for currentNumberOfWorkers dynamic workers to 
stop")
-       e.dynamicWorkers.Wait()
+       e.wg.Wait()
        e.log.Trace().Msg("stopped")
 }
 
diff --git a/plc4go/spi/pool/dynamicExecutor_test.go 
b/plc4go/spi/pool/dynamicExecutor_test.go
index 536d1544e9..0b66250570 100644
--- a/plc4go/spi/pool/dynamicExecutor_test.go
+++ b/plc4go/spi/pool/dynamicExecutor_test.go
@@ -20,6 +20,7 @@
 package pool
 
 import (
+       "context"
        "sync/atomic"
        "testing"
        "time"
@@ -86,7 +87,7 @@ func Test_dynamicExecutor_Start(t *testing.T) {
                        },
                        setup: func(t *testing.T, fields *fields) {
                                fields.executor.log = produceTestingLogger(t)
-                               fields.executor.workItems <- workItem{1, func() 
{}, &future{}}
+                               fields.executor.workItems <- workItem{1, 
func(context.Context) {}, &future{}}
                        },
                },
                {
@@ -101,7 +102,7 @@ func Test_dynamicExecutor_Start(t *testing.T) {
                        },
                        setup: func(t *testing.T, fields *fields) {
                                fields.executor.log = produceTestingLogger(t)
-                               fields.executor.workItems <- workItem{1, func() 
{}, &future{}}
+                               fields.executor.workItems <- workItem{1, 
func(context.Context) {}, &future{}}
                        },
                        startTwice: true,
                },
@@ -152,7 +153,7 @@ func Test_dynamicExecutor_Stop(t *testing.T) {
                        },
                        setup: func(t *testing.T, fields *fields) {
                                fields.executor.log = produceTestingLogger(t)
-                               fields.executor.workItems <- workItem{1, func() 
{}, &future{}}
+                               fields.executor.workItems <- workItem{1, 
func(context.Context) {}, &future{}}
                        },
                },
                {
@@ -167,7 +168,7 @@ func Test_dynamicExecutor_Stop(t *testing.T) {
                        },
                        setup: func(t *testing.T, fields *fields) {
                                fields.executor.log = produceTestingLogger(t)
-                               fields.executor.workItems <- workItem{1, func() 
{}, &future{}}
+                               fields.executor.workItems <- workItem{1, 
func(context.Context) {}, &future{}}
                        },
                },
                {
@@ -182,7 +183,7 @@ func Test_dynamicExecutor_Stop(t *testing.T) {
                        },
                        setup: func(t *testing.T, fields *fields) {
                                fields.executor.log = produceTestingLogger(t)
-                               fields.executor.workItems <- workItem{1, func() 
{}, &future{}}
+                               fields.executor.workItems <- workItem{1, 
func(context.Context) {}, &future{}}
                        },
                        stopTwice: true,
                },
diff --git a/plc4go/spi/pool/executor.go b/plc4go/spi/pool/executor.go
index 22a5977fdb..a63e6e8bc3 100644
--- a/plc4go/spi/pool/executor.go
+++ b/plc4go/spi/pool/executor.go
@@ -39,6 +39,9 @@ type executor struct {
        workItems    chan workItem
        traceWorkers bool
 
+       ctx       context.Context
+       ctxCancel context.CancelFunc
+
        stateChange     sync.RWMutex
        workerWaitGroup sync.WaitGroup
 
@@ -50,6 +53,7 @@ func newExecutor(queueDepth int, numberOfInitialWorkers int, 
customLogger zerolo
                workItems: make(chan workItem, queueDepth),
                log:       customLogger,
        }
+       e.ctx, e.ctxCancel = context.WithCancel(context.Background())
        workers := make([]*worker, numberOfInitialWorkers)
        for i := 0; i < numberOfInitialWorkers; i++ {
                w := newWorker(customLogger, i, e)
@@ -71,6 +75,10 @@ func (e *executor) getWorkerWaitGroup() *sync.WaitGroup {
        return &e.workerWaitGroup
 }
 
+func (e *executor) getCtx() context.Context {
+       return e.ctx
+}
+
 func (e *executor) Submit(ctx context.Context, workItemId int32, runnable 
Runnable) CompletionFuture {
        if runnable == nil {
                value := atomic.Value{}
@@ -127,11 +135,13 @@ func (e *executor) Stop() {
        }
        e.shutdown = true
        for i := 0; i < len(e.worker); i++ {
+               e.log.Debug().Int("workerId", i).Msg("stopping worker")
                e.worker[i].stop(true)
        }
        e.running = false
        e.shutdown = false
        e.log.Debug().Int("nWorkers", len(e.worker)).Msg("waiting for nWorkers 
workers to stop")
+       e.ctxCancel()
        e.workerWaitGroup.Wait()
        e.log.Trace().Msg("stopped")
 }
diff --git a/plc4go/spi/pool/executor_plc4xgen.go 
b/plc4go/spi/pool/executor_plc4xgen.go
index 873919c971..48aa2406f1 100644
--- a/plc4go/spi/pool/executor_plc4xgen.go
+++ b/plc4go/spi/pool/executor_plc4xgen.go
@@ -93,6 +93,44 @@ func (d *executor) SerializeWithWriteBuffer(ctx 
context.Context, writeBuffer uti
        if err := writeBuffer.WriteBit("traceWorkers", d.traceWorkers); err != 
nil {
                return err
        }
+
+       if d.ctx != nil {
+               if serializableField, ok := any(d.ctx).(utils.Serializable); ok 
{
+                       if err := writeBuffer.PushContext("ctx"); err != nil {
+                               return err
+                       }
+                       if err := 
serializableField.SerializeWithWriteBuffer(ctx, writeBuffer); err != nil {
+                               return err
+                       }
+                       if err := writeBuffer.PopContext("ctx"); err != nil {
+                               return err
+                       }
+               } else {
+                       stringValue := fmt.Sprintf("%v", d.ctx)
+                       if err := writeBuffer.WriteString("ctx", 
uint32(len(stringValue)*8), stringValue); err != nil {
+                               return err
+                       }
+               }
+       }
+
+       if d.ctxCancel != nil {
+               if serializableField, ok := 
any(d.ctxCancel).(utils.Serializable); ok {
+                       if err := writeBuffer.PushContext("ctxCancel"); err != 
nil {
+                               return err
+                       }
+                       if err := 
serializableField.SerializeWithWriteBuffer(ctx, writeBuffer); err != nil {
+                               return err
+                       }
+                       if err := writeBuffer.PopContext("ctxCancel"); err != 
nil {
+                               return err
+                       }
+               } else {
+                       stringValue := fmt.Sprintf("%v", d.ctxCancel)
+                       if err := writeBuffer.WriteString("ctxCancel", 
uint32(len(stringValue)*8), stringValue); err != nil {
+                               return err
+                       }
+               }
+       }
        if err := writeBuffer.PopContext("executor"); err != nil {
                return err
        }
diff --git a/plc4go/spi/pool/executor_test.go b/plc4go/spi/pool/executor_test.go
index 1a9d658bf8..b67d6bcd22 100644
--- a/plc4go/spi/pool/executor_test.go
+++ b/plc4go/spi/pool/executor_test.go
@@ -259,9 +259,12 @@ func Test_executor_Submit(t *testing.T) {
                        },
                        args: args{
                                workItemId: 13,
-                               runnable: func() {
+                               runnable: func(ctx context.Context) {
                                        // We do something for 3 seconds
-                                       <-time.NewTimer(3 * time.Second).C
+                                       select {
+                                       case <-time.NewTimer(3 * time.Second).C:
+                                       case <-ctx.Done():
+                                       }
                                },
                                context: func() context.Context {
                                        ctx, cancelFunc := 
context.WithCancel(context.Background())
@@ -281,9 +284,12 @@ func Test_executor_Submit(t *testing.T) {
                        },
                        args: args{
                                workItemId: 13,
-                               runnable: func() {
+                               runnable: func(ctx context.Context) {
                                        // We do something for 3 seconds
-                                       <-time.NewTimer(3 * time.Second).C
+                                       select {
+                                       case <-time.NewTimer(3 * time.Second).C:
+                                       case <-ctx.Done():
+                                       }
                                },
                                context: t.Context(),
                        },
@@ -306,7 +312,7 @@ func Test_executor_Submit(t *testing.T) {
                        }(),
                        args: args{
                                workItemId: 13,
-                               runnable: func() {
+                               runnable: func(_ context.Context) {
                                        // NOOP
                                },
                                context: t.Context(),
diff --git a/plc4go/spi/pool/worker.go b/plc4go/spi/pool/worker.go
index 8a11c41428..60b95cbe6e 100644
--- a/plc4go/spi/pool/worker.go
+++ b/plc4go/spi/pool/worker.go
@@ -20,6 +20,7 @@
 package pool
 
 import (
+       "context"
        "runtime/debug"
        "sync"
        "sync/atomic"
@@ -35,6 +36,7 @@ type worker struct {
                isTraceWorkers() bool
                getWorksItems() chan workItem
                getWorkerWaitGroup() *sync.WaitGroup
+               getCtx() context.Context
        }
 
        lastReceived atomic.Value
@@ -52,6 +54,7 @@ func newWorker(localLog zerolog.Logger, workerId int, 
executor interface {
        isTraceWorkers() bool
        getWorksItems() chan workItem
        getWorkerWaitGroup() *sync.WaitGroup
+       getCtx() context.Context
 }) *worker {
        w := &worker{
                id:       workerId,
@@ -82,9 +85,8 @@ func (w *worker) start() {
        if w.executor.isTraceWorkers() {
                w.log.Debug().Stringer("worker", w).Msg("Starting worker")
        }
-       w.executor.getWorkerWaitGroup().Add(1)
        w.running.Store(true)
-       go w.work()
+       w.executor.getWorkerWaitGroup().Go(w.work)
 }
 
 func (w *worker) stop(interrupt bool) {
@@ -106,7 +108,6 @@ func (w *worker) stop(interrupt bool) {
 }
 
 func (w *worker) work() {
-       defer w.executor.getWorkerWaitGroup().Done()
        defer func() {
                if err := recover(); err != nil {
                        w.log.Error().
@@ -137,12 +138,15 @@ func (w *worker) work() {
                                // TODO: do we need to complete with a error?
                        } else {
                                workItemLog.Debug().Msg("Running work item")
-                               _workItem.runnable()
+                               _workItem.runnable(w.executor.getCtx())
                                _workItem.completionFuture.complete()
                                workItemLog.Debug().Msg("work item completed")
                        }
                case <-w.interrupter:
                        workerLog.Debug().Msg("We got interrupted")
+               case <-w.executor.getCtx().Done():
+                       workerLog.Debug().Msg("Ctx done")
+                       return
                }
        }
        workerLog.Trace().Msg("done")
diff --git a/plc4go/spi/pool/worker_test.go b/plc4go/spi/pool/worker_test.go
index bdd96b12cf..1e1a8060ab 100644
--- a/plc4go/spi/pool/worker_test.go
+++ b/plc4go/spi/pool/worker_test.go
@@ -20,6 +20,7 @@
 package pool
 
 import (
+       "context"
        "sync"
        "sync/atomic"
        "testing"
@@ -37,6 +38,7 @@ func Test_worker_initialize(t *testing.T) {
                        isTraceWorkers() bool
                        getWorksItems() chan workItem
                        getWorkerWaitGroup() *sync.WaitGroup
+                       getCtx() context.Context
                }
        }
        tests := []struct {
@@ -67,6 +69,7 @@ func Test_worker_start(t *testing.T) {
                        isTraceWorkers() bool
                        getWorksItems() chan workItem
                        getWorkerWaitGroup() *sync.WaitGroup
+                       getCtx() context.Context
                }
                lastReceived atomic.Value
                interrupter  chan struct{}
@@ -86,7 +89,7 @@ func Test_worker_start(t *testing.T) {
                                        }
                                        e.workItems <- workItem{
                                                workItemId: 0,
-                                               runnable: func() {
+                                               runnable: func(context.Context) 
{
                                                        // No-op
                                                },
                                                completionFuture: &future{},
@@ -105,7 +108,7 @@ func Test_worker_start(t *testing.T) {
                                        }
                                        e.workItems <- workItem{
                                                workItemId: 0,
-                                               runnable: func() {
+                                               runnable: func(context.Context) 
{
                                                        // No-op
                                                },
                                                completionFuture: &future{},
@@ -146,6 +149,7 @@ func Test_worker_stop(t *testing.T) {
                        isTraceWorkers() bool
                        getWorksItems() chan workItem
                        getWorkerWaitGroup() *sync.WaitGroup
+                       getCtx() context.Context
                }
                lastReceived atomic.Value
                interrupter  chan struct{}
@@ -168,15 +172,17 @@ func Test_worker_stop(t *testing.T) {
                                                workItems:    make(chan 
workItem),
                                                traceWorkers: true,
                                        }
-                                       go func() {
+                                       var wg sync.WaitGroup
+                                       t.Cleanup(wg.Wait)
+                                       wg.Go(func() {
                                                e.workItems <- workItem{
                                                        workItemId: 0,
-                                                       runnable: func() {
+                                                       runnable: 
func(context.Context) {
                                                                // No-op
                                                        },
                                                        completionFuture: 
&future{},
                                                }
-                                       }()
+                                       })
                                        return e
                                }(),
                        },
@@ -226,15 +232,17 @@ func Test_worker_work(t *testing.T) {
                                                workItems:    make(chan 
workItem),
                                                traceWorkers: true,
                                        }
-                                       go func() {
+                                       var wg sync.WaitGroup
+                                       t.Cleanup(wg.Wait)
+                                       wg.Go(func() {
                                                e.workItems <- workItem{
                                                        workItemId: 0,
-                                                       runnable: func() {
+                                                       runnable: 
func(context.Context) {
                                                                panic("Oh no 
what should I do???")
                                                        },
                                                        completionFuture: 
&future{},
                                                }
-                                       }()
+                                       })
                                        return e
                                }(),
                        },
@@ -260,15 +268,17 @@ func Test_worker_work(t *testing.T) {
                                                workItems:    make(chan 
workItem),
                                                traceWorkers: true,
                                        }
-                                       go func() {
+                                       var wg sync.WaitGroup
+                                       t.Cleanup(wg.Wait)
+                                       wg.Go(func() {
                                                e.workItems <- workItem{
                                                        workItemId: 0,
-                                                       runnable: func() {
+                                                       runnable: 
func(context.Context) {
                                                                
time.Sleep(time.Millisecond * 70)
                                                        },
                                                        completionFuture: 
&future{},
                                                }
-                                       }()
+                                       })
                                        return e
                                }(),
                        },
@@ -318,17 +328,19 @@ func Test_worker_work(t *testing.T) {
                                                workItems:    make(chan 
workItem),
                                                traceWorkers: true,
                                        }
-                                       go func() {
+                                       var wg sync.WaitGroup
+                                       t.Cleanup(wg.Wait)
+                                       wg.Go(func() {
                                                completionFuture := &future{}
                                                
completionFuture.cancelRequested.Store(true)
                                                e.workItems <- workItem{
                                                        workItemId: 0,
-                                                       runnable: func() {
+                                                       runnable: 
func(context.Context) {
                                                                
time.Sleep(time.Millisecond * 70)
                                                        },
                                                        completionFuture: 
completionFuture,
                                                }
-                                       }()
+                                       })
                                        return e
                                }(),
                        },
diff --git a/plc4go/spi/transactions/RequestTransaction.go 
b/plc4go/spi/transactions/RequestTransaction.go
index 2b6be82778..4b0aa6467b 100644
--- a/plc4go/spi/transactions/RequestTransaction.go
+++ b/plc4go/spi/transactions/RequestTransaction.go
@@ -129,9 +129,9 @@ func (t *requestTransaction) Submit(operation 
RequestTransactionRunnable) {
                t.log.Warn().Msg("Operation already set")
        }
        t.log.Trace().Int32("transactionId", t.transactionId).Msg("Submission")
-       t.operation = func() {
+       t.operation = func(ctx context.Context) {
                t.log.Trace().Int32("transactionId", 
t.transactionId).Msg("Start operation")
-               operation(t)
+               operation(ctx, t)
                t.log.Trace().Int32("transactionId", 
t.transactionId).Msg("Completed operation")
        }
        t.parent.submitTransaction(t)
diff --git a/plc4go/spi/transactions/RequestTransactionManager.go 
b/plc4go/spi/transactions/RequestTransactionManager.go
index b4dd3c7fb4..0fc5dcf055 100644
--- a/plc4go/spi/transactions/RequestTransactionManager.go
+++ b/plc4go/spi/transactions/RequestTransactionManager.go
@@ -60,7 +60,7 @@ func init() {
        }()
 }
 
-type RequestTransactionRunnable func(RequestTransaction)
+type RequestTransactionRunnable func(context.Context, RequestTransaction)
 
 // RequestTransactionManager handles transactions
 type RequestTransactionManager interface {
@@ -77,7 +77,7 @@ type RequestTransactionManager interface {
 func NewRequestTransactionManager(numberOfConcurrentRequests int, _options 
...options.WithOption) RequestTransactionManager {
        extractTraceTransactionManagerTransactions, _ := 
options.ExtractTraceTransactionManagerTransactions(_options...)
        customLogger := 
options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
-       _requestTransactionManager := &requestTransactionManager{
+       rtm := &requestTransactionManager{
                numberOfConcurrentRequests: numberOfConcurrentRequests,
                currentTransactionId:       0,
                workLog:                    *list.New(),
@@ -87,13 +87,14 @@ func 
NewRequestTransactionManager(numberOfConcurrentRequests int, _options ...op
 
                log: customLogger,
        }
+       rtm.ctx, rtm.cancelCtx = context.WithCancel(context.Background())
        for _, option := range _options {
                switch option := option.(type) {
                case *withCustomExecutor:
-                       _requestTransactionManager.executor = option.executor
+                       rtm.executor = option.executor
                }
        }
-       return _requestTransactionManager
+       return rtm
 }
 
 // WithCustomExecutor sets a custom Executor for the RequestTransactionManager
@@ -129,6 +130,9 @@ type requestTransactionManager struct {
 
        shutdown atomic.Bool // Indicates it this rtm is in shutdown
 
+       ctx       context.Context    `ignore:"true"`
+       cancelCtx context.CancelFunc `ignore:"true"`
+
        traceTransactionManagerTransactions bool // flag set to true if it 
should trace transactions
 
        log zerolog.Logger
@@ -184,7 +188,7 @@ func (r *requestTransactionManager) processWorklog() {
                        Int("nRunningRequests", len(r.runningRequests)).
                        Msg("Handling next. (Adding to running requests 
(length: nRunningRequests))")
                r.runningRequests = append(r.runningRequests, next)
-               completionFuture := r.executor.Submit(context.Background(), 
next.transactionId, next.operation)
+               completionFuture := r.executor.Submit(r.ctx, 
next.transactionId, next.operation)
                next.setCompletionFuture(completionFuture)
                r.workLog.Remove(front)
        }
@@ -284,6 +288,7 @@ func (r *requestTransactionManager) CloseGraceful(timeout 
time.Duration) error {
        } else {
                r.log.Warn().Msg("not closing shared instance")
        }
+       r.cancelCtx()
        r.log.Debug().Msg("closed")
        return nil
 }
diff --git a/plc4go/spi/transactions/RequestTransactionManager_test.go 
b/plc4go/spi/transactions/RequestTransactionManager_test.go
index 76868fa91f..484894d660 100644
--- a/plc4go/spi/transactions/RequestTransactionManager_test.go
+++ b/plc4go/spi/transactions/RequestTransactionManager_test.go
@@ -26,8 +26,8 @@ import (
        "testing"
        "time"
 
-       "github.com/rs/zerolog/log"
        "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
 
        "github.com/apache/plc4x/plc4go/spi/options"
        "github.com/apache/plc4x/plc4go/spi/pool"
@@ -39,17 +39,23 @@ func TestNewRequestTransactionManager(t *testing.T) {
                requestTransactionManagerOptions []options.WithOption
        }
        tests := []struct {
-               name  string
-               args  args
-               setup func(t *testing.T, args *args)
-               want  RequestTransactionManager
+               name       string
+               args       args
+               setup      func(t *testing.T, args *args)
+               wantAssert func(*testing.T, RequestTransactionManager) bool
        }{
                {
                        name: "just create one",
-                       want: &requestTransactionManager{
-                               workLog:  *list.New(),
-                               executor: sharedExecutorInstance,
-                               log:      log.Logger,
+                       wantAssert: func(t *testing.T, rtm 
RequestTransactionManager) bool {
+                               require.IsType(t, &requestTransactionManager{}, 
rtm)
+                               rtmi := rtm.(*requestTransactionManager)
+                               assert.NotNil(t, rtm)
+                               assert.NotNil(t, rtmi.executor)
+                               assert.Same(t, rtmi.executor, 
sharedExecutorInstance)
+                               assert.NotNil(t, rtmi.workLog)
+                               assert.NotNil(t, rtmi.ctx)
+                               assert.NotNil(t, rtmi.cancelCtx)
+                               return true
                        },
                },
                {
@@ -60,11 +66,17 @@ func TestNewRequestTransactionManager(t *testing.T) {
                                        
WithCustomExecutor(sharedExecutorInstance),
                                },
                        },
-                       want: &requestTransactionManager{
-                               numberOfConcurrentRequests: 2,
-                               workLog:                    *list.New(),
-                               executor:                   
sharedExecutorInstance,
-                               log:                        log.Logger,
+                       wantAssert: func(t *testing.T, rtm 
RequestTransactionManager) bool {
+                               require.IsType(t, &requestTransactionManager{}, 
rtm)
+                               rtmi := rtm.(*requestTransactionManager)
+                               assert.NotNil(t, rtm)
+                               assert.NotNil(t, rtmi.executor)
+                               assert.Same(t, rtmi.executor, 
sharedExecutorInstance)
+                               assert.NotNil(t, rtmi.workLog)
+                               assert.NotNil(t, rtmi.ctx)
+                               assert.NotNil(t, rtmi.cancelCtx)
+                               assert.Equal(t, 2, 
rtmi.numberOfConcurrentRequests)
+                               return true
                        },
                },
        }
@@ -73,8 +85,9 @@ func TestNewRequestTransactionManager(t *testing.T) {
                        if tt.setup != nil {
                                tt.setup(t, &tt.args)
                        }
-                       if got := 
NewRequestTransactionManager(tt.args.numberOfConcurrentRequests, 
tt.args.requestTransactionManagerOptions...); !assert.Equal(t, tt.want, got) {
-                               t.Errorf("NewRequestTransactionManager() = %v, 
want %v", got, tt.want)
+                       got := 
NewRequestTransactionManager(tt.args.numberOfConcurrentRequests, 
tt.args.requestTransactionManagerOptions...)
+                       if !assert.True(t, tt.wantAssert(t, got)) {
+                               t.Errorf("NewRequestTransactionManager() = %v", 
got)
                        }
                })
        }
@@ -458,7 +471,7 @@ func Test_requestTransactionManager_submitTransaction(t 
*testing.T) {
                        name: "submit it",
                        args: args{
                                handle: &requestTransaction{
-                                       operation: func() {
+                                       operation: func(context.Context) {
                                                // doesn't matter
                                        },
                                },
@@ -492,6 +505,8 @@ func Test_requestTransactionManager_Close(t *testing.T) {
                currentTransactionId                int32
                workLog                             list.List
                executor                            pool.Executor
+               ctx                                 context.Context
+               cancelCtx                           context.CancelFunc
                traceTransactionManagerTransactions bool
        }
        tests := []struct {
@@ -503,6 +518,7 @@ func Test_requestTransactionManager_Close(t *testing.T) {
                {
                        name: "close it",
                        setup: func(t *testing.T, fields *fields) {
+                               fields.ctx, fields.cancelCtx = 
context.WithCancel(t.Context())
                                executor := NewMockExecutor(t)
                                executor.EXPECT().Close().Return(nil)
                                fields.executor = executor
@@ -522,6 +538,8 @@ func Test_requestTransactionManager_Close(t *testing.T) {
                                workLog:                             
tt.fields.workLog,
                                executor:                            
tt.fields.executor,
                                traceTransactionManagerTransactions: 
tt.fields.traceTransactionManagerTransactions,
+                               ctx:                                 
tt.fields.ctx,
+                               cancelCtx:                           
tt.fields.cancelCtx,
                                log:                                 
produceTestingLogger(t),
                        }
                        tt.wantErr(t, r.Close(), fmt.Sprintf("Close()"))
@@ -536,6 +554,8 @@ func Test_requestTransactionManager_CloseGraceful(t 
*testing.T) {
                currentTransactionId                int32
                workLog                             list.List
                executor                            pool.Executor
+               ctx                                 context.Context
+               cancelCtx                           context.CancelFunc
                traceTransactionManagerTransactions bool
        }
        type args struct {
@@ -551,6 +571,7 @@ func Test_requestTransactionManager_CloseGraceful(t 
*testing.T) {
                {
                        name: "close it",
                        setup: func(t *testing.T, fields *fields) {
+                               fields.ctx, fields.cancelCtx = 
context.WithCancel(t.Context())
                                executor := NewMockExecutor(t)
                                executor.EXPECT().Close().Return(nil)
                                fields.executor = executor
@@ -563,6 +584,7 @@ func Test_requestTransactionManager_CloseGraceful(t 
*testing.T) {
                                timeout: 20 * time.Millisecond,
                        },
                        setup: func(t *testing.T, fields *fields) {
+                               fields.ctx, fields.cancelCtx = 
context.WithCancel(t.Context())
                                executor := NewMockExecutor(t)
                                executor.EXPECT().Close().Return(nil)
                                fields.executor = executor
@@ -580,6 +602,7 @@ func Test_requestTransactionManager_CloseGraceful(t 
*testing.T) {
                                timeout: 20 * time.Millisecond,
                        },
                        setup: func(t *testing.T, fields *fields) {
+                               fields.ctx, fields.cancelCtx = 
context.WithCancel(t.Context())
                                executor := NewMockExecutor(t)
                                executor.EXPECT().Close().Return(nil)
                                fields.executor = executor
@@ -599,6 +622,8 @@ func Test_requestTransactionManager_CloseGraceful(t 
*testing.T) {
                                workLog:                             
tt.fields.workLog,
                                executor:                            
tt.fields.executor,
                                traceTransactionManagerTransactions: 
tt.fields.traceTransactionManagerTransactions,
+                               ctx:                                 
tt.fields.ctx,
+                               cancelCtx:                           
tt.fields.cancelCtx,
                                log:                                 
produceTestingLogger(t),
                        }
                        tt.wantErr(t, r.CloseGraceful(tt.args.timeout), 
fmt.Sprintf("CloseGraceful(%v)", tt.args.timeout))
diff --git a/plc4go/spi/transactions/RequestTransaction_test.go 
b/plc4go/spi/transactions/RequestTransaction_test.go
index 2f8ead7770..57ec0990ba 100644
--- a/plc4go/spi/transactions/RequestTransaction_test.go
+++ b/plc4go/spi/transactions/RequestTransaction_test.go
@@ -21,6 +21,7 @@ package transactions
 
 import (
        "context"
+       "sync"
        "testing"
        "time"
 
@@ -230,7 +231,7 @@ func Test_requestTransaction_Submit(t1 *testing.T) {
                                parent: &requestTransactionManager{},
                        },
                        args: args{
-                               operation: func(_ RequestTransaction) {
+                               operation: func(context.Context, 
RequestTransaction) {
                                        // NOOP
                                },
                        },
@@ -239,12 +240,12 @@ func Test_requestTransaction_Submit(t1 *testing.T) {
                        name: "submit something again",
                        fields: fields{
                                parent: &requestTransactionManager{},
-                               operation: func() {
+                               operation: func(context.Context) {
                                        // NOOP
                                },
                        },
                        args: args{
-                               operation: func(_ RequestTransaction) {
+                               operation: func(context.Context, 
RequestTransaction) {
                                        // NOOP
                                },
                        },
@@ -253,29 +254,29 @@ func Test_requestTransaction_Submit(t1 *testing.T) {
                        name: "submit completed",
                        fields: fields{
                                parent: &requestTransactionManager{},
-                               operation: func() {
+                               operation: func(context.Context) {
                                        // NOOP
                                },
                                completed: true,
                        },
                        args: args{
-                               operation: func(_ RequestTransaction) {
+                               operation: func(context.Context, 
RequestTransaction) {
                                        // NOOP
                                },
                        },
                },
        }
        for _, tt := range tests {
-               t1.Run(tt.name, func(t1 *testing.T) {
-                       t := &requestTransaction{
+               t1.Run(tt.name, func(t *testing.T) {
+                       rt := &requestTransaction{
                                parent:        tt.fields.parent,
                                transactionId: tt.fields.transactionId,
                                operation:     tt.fields.operation,
                                log:           tt.fields.transactionLog,
                                completed:     tt.fields.completed,
                        }
-                       t.Submit(tt.args.operation)
-                       t.operation()
+                       rt.Submit(tt.args.operation)
+                       rt.operation(t.Context())
                })
        }
 }
@@ -317,13 +318,15 @@ func Test_requestTransaction_AwaitCompletion(t1 
*testing.T) {
                                
expect.AwaitCompletion(mock.Anything).Return(nil)
                                var completionFuture pool.CompletionFuture = 
completionFutureMock
                                
transaction.completionFuture.Store(&completionFuture)
-                               go func() {
+                               var wg sync.WaitGroup
+                               t.Cleanup(wg.Wait)
+                               wg.Go(func() {
                                        time.Sleep(100 * time.Millisecond)
                                        r := transaction.parent
                                        r.workLogMutex.RLock()
                                        defer r.workLogMutex.RUnlock()
                                        r.runningRequests = 
append(r.runningRequests, &requestTransaction{transactionId: 1})
-                               }()
+                               })
                        },
                },
        }
diff --git a/plc4go/spi/transports/pcap/TransportInstance.go 
b/plc4go/spi/transports/pcap/TransportInstance.go
index f61ab760bc..d6ecc63452 100644
--- a/plc4go/spi/transports/pcap/TransportInstance.go
+++ b/plc4go/spi/transports/pcap/TransportInstance.go
@@ -55,6 +55,8 @@ type TransportInstance struct {
        handle    *pcap.Handle
        reader    *bufio.Reader
 
+       wg sync.WaitGroup
+
        log zerolog.Logger
 }
 
@@ -95,7 +97,7 @@ func (m *TransportInstance) Connect() error {
        buffer := new(bytes.Buffer)
        m.reader = bufio.NewReader(buffer)
 
-       go func(m *TransportInstance, buffer *bytes.Buffer) {
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                m.log.Error().
@@ -161,7 +163,7 @@ func (m *TransportInstance) Connect() error {
                        buffer.Write(payload)
                        lastPacketTime = &captureInfo.Timestamp
                }
-       }(m, buffer)
+       })
 
        return nil
 }
@@ -174,6 +176,7 @@ func (m *TransportInstance) Close() error {
                handle.Close()
        }
        m.connected.Store(false)
+       m.wg.Wait()
        return nil
 }
 
diff --git a/plc4go/spi/transports/tcp/TransportInstance_test.go 
b/plc4go/spi/transports/tcp/TransportInstance_test.go
index 4f847343bf..6dc0ff69cb 100644
--- a/plc4go/spi/transports/tcp/TransportInstance_test.go
+++ b/plc4go/spi/transports/tcp/TransportInstance_test.go
@@ -23,6 +23,7 @@ import (
        "bufio"
        "context"
        "net"
+       "sync"
        "testing"
 
        "github.com/rs/zerolog/log"
@@ -102,9 +103,11 @@ func TestTransportInstance_Close(t *testing.T) {
                                        t.Cleanup(func() {
                                                assert.NoError(t, 
listener.Close())
                                        })
-                                       go func() {
+                                       var wg sync.WaitGroup
+                                       t.Cleanup(wg.Wait)
+                                       wg.Go(func() {
                                                _, _ = listener.Accept()
-                                       }()
+                                       })
                                        tcp, err := net.DialTCP("tcp", nil, 
listener.Addr().(*net.TCPAddr))
                                        require.NoError(t, err)
                                        t.Cleanup(func() {
@@ -206,9 +209,11 @@ func TestTransportInstance_ConnectWithContext(t 
*testing.T) {
                                        t.Cleanup(func() {
                                                assert.NoError(t, 
listener.Close())
                                        })
-                                       go func() {
+                                       var wg sync.WaitGroup
+                                       t.Cleanup(wg.Wait)
+                                       wg.Go(func() {
                                                _, _ = listener.Accept()
-                                       }()
+                                       })
                                        return listener.Addr().(*net.TCPAddr)
                                }(),
                        },
@@ -401,9 +406,11 @@ func TestTransportInstance_Write(t *testing.T) {
                                t.Cleanup(func() {
                                        assert.NoError(t, listener.Close())
                                })
-                               go func() {
+                               var wg sync.WaitGroup
+                               t.Cleanup(wg.Wait)
+                               wg.Go(func() {
                                        _, _ = listener.Accept()
-                               }()
+                               })
                                tcp, err := net.DialTCP("tcp", nil, 
listener.Addr().(*net.TCPAddr))
                                require.NoError(t, err)
                                t.Cleanup(func() {
diff --git a/plc4go/spi/transports/udp/TransportInstance.go 
b/plc4go/spi/transports/udp/TransportInstance.go
index b66ecb8f7c..291b997481 100644
--- a/plc4go/spi/transports/udp/TransportInstance.go
+++ b/plc4go/spi/transports/udp/TransportInstance.go
@@ -109,9 +109,7 @@ func (m *TransportInstance) ConnectWithContext(ctx 
context.Context) error {
        }
 
        // TODO: Start a worker that uses m.udpConn.ReadFromUDP() to fill a 
buffer
-       /*m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       /*      m.wg.Go(func() {
            buf := make([]byte, 1024)
            for {
                rsize, raddr, err := m.udpConn.ReadFromUDP(buf)
diff --git a/plc4go/spi/transports/utils/DefaultBufferedTransportInstance.go 
b/plc4go/spi/transports/utils/DefaultBufferedTransportInstance.go
index 5b1381fb25..cb8eaac832 100644
--- a/plc4go/spi/transports/utils/DefaultBufferedTransportInstance.go
+++ b/plc4go/spi/transports/utils/DefaultBufferedTransportInstance.go
@@ -64,9 +64,7 @@ type defaultBufferedTransportInstance struct {
 // ConnectWithContext is a compatibility implementation for those transports 
not implementing this function
 func (m *defaultBufferedTransportInstance) ConnectWithContext(ctx 
context.Context) error {
        ch := make(chan error, 1)
-       m.wg.Add(1)
-       go func() {
-               defer m.wg.Done()
+       m.wg.Go(func() {
                defer func() {
                        if err := recover(); err != nil {
                                m.log.Error().
@@ -77,7 +75,7 @@ func (m *defaultBufferedTransportInstance) 
ConnectWithContext(ctx context.Contex
                }()
                ch <- m.Connect()
                close(ch)
-       }()
+       })
        select {
        case err := <-ch:
                return err
diff --git a/plc4go/spi/utils/StopWarn.go b/plc4go/spi/utils/StopWarn.go
index 246ae1fac7..cfdeea6581 100644
--- a/plc4go/spi/utils/StopWarn.go
+++ b/plc4go/spi/utils/StopWarn.go
@@ -58,9 +58,7 @@ func StopWarn(localLog zerolog.Logger, opts 
...func(*stopWarnOptions)) func() {
        ticker := time.NewTicker(o.interval)
        wg := new(sync.WaitGroup)
        done := make(chan struct{})
-       wg.Add(1)
-       go func() {
-               defer wg.Done()
+       wg.Go(func() {
                localLog.Trace().Msgf("start checking")
                startTime := time.Now()
                for {
@@ -94,7 +92,7 @@ func StopWarn(localLog zerolog.Logger, opts 
...func(*stopWarnOptions)) func() {
                                        Msgf("%sstill in progress", processId)
                        }
                }
-       }()
+       })
        start := time.Now()
        return func() {
                localLog.Trace().TimeDiff("check duration", time.Now(), 
start).Msg("done")

Reply via email to