This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 878c4084e8 fix: unified wait group/go func usage
878c4084e8 is described below
commit 878c4084e8ccfe62ef7895432eca5a3694912d13
Author: Sebastian Rühl <[email protected]>
AuthorDate: Fri Nov 7 13:17:11 2025 +0100
fix: unified wait group/go func usage
feat: added ctx to executor
---
plc4go/internal/ads/Browser.go | 6 +--
plc4go/internal/ads/Connection.go | 12 ++---
plc4go/internal/ads/Discoverer.go | 6 +--
plc4go/internal/ads/Interactions.go | 36 +++++--------
plc4go/internal/ads/Reader.go | 12 ++---
plc4go/internal/ads/Subscriber.go | 12 ++---
plc4go/internal/ads/Writer.go | 12 ++---
.../bacnetip/ApplicationLayerMessageCodec.go | 24 +++------
plc4go/internal/bacnetip/Connection.go | 12 ++---
plc4go/internal/bacnetip/Discoverer.go | 20 +++-----
plc4go/internal/bacnetip/Reader.go | 10 ++--
plc4go/internal/bacnetip/Subscriber.go | 6 +--
plc4go/internal/bacnetip/bacgopes/core/core.go | 11 ++--
.../internal/bacnetip/bacgopes/iocb/iocb_IOCB.go | 6 +--
.../bacnetip/bacgopes/iocb/iocb_IOQueue.go | 6 +--
.../bacnetip/bacgopes/udp/udp_UDPDirector.go | 18 +++----
plc4go/internal/cbus/Browser.go | 6 +--
plc4go/internal/cbus/CBusMessageMapper_test.go | 45 +++++++++--------
plc4go/internal/cbus/Connection.go | 28 +++-------
plc4go/internal/cbus/Connection_test.go | 12 ++---
plc4go/internal/cbus/Reader.go | 4 +-
plc4go/internal/cbus/Subscriber.go | 6 +--
plc4go/internal/cbus/Writer.go | 10 ++--
plc4go/internal/eip/Connection.go | 12 ++---
plc4go/internal/eip/Reader.go | 10 ++--
plc4go/internal/eip/Writer.go | 6 +--
plc4go/internal/knxnetip/Connection.go | 24 +++------
.../knxnetip/ConnectionDriverSpecificOperations.go | 42 +++++----------
plc4go/internal/knxnetip/ConnectionHelper.go | 10 ++--
plc4go/internal/knxnetip/Discoverer.go | 26 +++++-----
plc4go/internal/knxnetip/Reader.go | 6 +--
plc4go/internal/knxnetip/Subscriber.go | 6 +--
plc4go/internal/modbus/AsciiDriver.go | 6 +--
plc4go/internal/modbus/Connection.go | 6 +--
plc4go/internal/modbus/Reader.go | 6 +--
plc4go/internal/modbus/RtuDriver.go | 6 +--
plc4go/internal/modbus/TcpDriver.go | 6 +--
plc4go/internal/modbus/Writer.go | 6 +--
plc4go/internal/opcua/Connection.go | 12 ++---
plc4go/internal/opcua/SecureChannel.go | 8 +--
plc4go/internal/opcua/SubscriptionHandle.go | 8 +--
plc4go/internal/s7/Connection.go | 6 +--
plc4go/internal/s7/Reader.go | 10 ++--
plc4go/internal/s7/Writer.go | 10 ++--
plc4go/internal/simulated/Connection.go | 18 +++----
plc4go/internal/simulated/Connection_test.go | 7 ++-
plc4go/internal/simulated/Reader.go | 6 +--
plc4go/internal/simulated/Writer.go | 6 +--
plc4go/pkg/api/cache/PlcConnectionCache.go | 44 +++++++---------
plc4go/pkg/api/cache/PlcConnectionCache_test.go | 23 ++++-----
plc4go/pkg/api/cache/connectionContainer.go | 12 ++---
plc4go/pkg/api/cache/plcConnectionLease.go | 6 +--
plc4go/spi/default/DefaultBrowser.go | 6 +--
plc4go/spi/default/DefaultCodec.go | 22 ++++----
plc4go/spi/default/DefaultCodec_test.go | 7 ++-
plc4go/spi/default/DefaultConnection.go | 12 ++---
plc4go/spi/model/DefaultPlcReadRequest.go | 6 +--
.../spi/model/DefaultPlcUnsubscriptionRequest.go | 6 +--
plc4go/spi/model/DefaultPlcWriteRequest.go | 6 +--
plc4go/spi/pool/WorkerPool.go | 2 +-
plc4go/spi/pool/WorkerPool_test.go | 18 +++++--
plc4go/spi/pool/dynamicExecutor.go | 22 +++-----
plc4go/spi/pool/dynamicExecutor_test.go | 11 ++--
plc4go/spi/pool/executor.go | 10 ++++
plc4go/spi/pool/executor_plc4xgen.go | 38 ++++++++++++++
plc4go/spi/pool/executor_test.go | 16 ++++--
plc4go/spi/pool/worker.go | 12 +++--
plc4go/spi/pool/worker_test.go | 40 ++++++++++-----
plc4go/spi/transactions/RequestTransaction.go | 4 +-
.../spi/transactions/RequestTransactionManager.go | 15 ++++--
.../transactions/RequestTransactionManager_test.go | 59 +++++++++++++++-------
plc4go/spi/transactions/RequestTransaction_test.go | 25 +++++----
plc4go/spi/transports/pcap/TransportInstance.go | 7 ++-
.../spi/transports/tcp/TransportInstance_test.go | 19 ++++---
plc4go/spi/transports/udp/TransportInstance.go | 4 +-
.../utils/DefaultBufferedTransportInstance.go | 6 +--
plc4go/spi/utils/StopWarn.go | 6 +--
77 files changed, 492 insertions(+), 561 deletions(-)
diff --git a/plc4go/internal/ads/Browser.go b/plc4go/internal/ads/Browser.go
index 9044702b1c..a301e28c52 100644
--- a/plc4go/internal/ads/Browser.go
+++ b/plc4go/internal/ads/Browser.go
@@ -44,9 +44,7 @@ func (m *Connection) Browse(ctx context.Context,
browseRequest apiModel.PlcBrows
func (m *Connection) BrowseWithInterceptor(ctx context.Context, browseRequest
apiModel.PlcBrowseRequest, interceptor func(result apiModel.PlcBrowseItem)
bool) <-chan apiModel.PlcBrowseRequestResult {
result := make(chan apiModel.PlcBrowseRequestResult, 1)
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
result <-
spiModel.NewDefaultPlcBrowseRequestResult(browseRequest, nil,
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -60,7 +58,7 @@ func (m *Connection) BrowseWithInterceptor(ctx
context.Context, browseRequest ap
}
browseResponse :=
spiModel.NewDefaultPlcBrowseResponse(browseRequest, results, responseCodes)
result <-
spiModel.NewDefaultPlcBrowseRequestResult(browseRequest, browseResponse, nil)
- }()
+ })
return result
}
diff --git a/plc4go/internal/ads/Connection.go
b/plc4go/internal/ads/Connection.go
index eaf4ed3ff1..23dfae6149 100644
--- a/plc4go/internal/ads/Connection.go
+++ b/plc4go/internal/ads/Connection.go
@@ -119,9 +119,7 @@ func (m *Connection) ConnectWithContext(ctx
context.Context) <-chan plc4go.PlcCo
// Reset the driver context (Actually this should not be required, but
just to be on the safe side)
m.driverContext.clear()
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
ch <-
_default.NewDefaultPlcConnectionCloseResult(nil, errors.Errorf("panic-ed %v.
Stack: %s", err, debug.Stack()))
@@ -133,7 +131,7 @@ func (m *Connection) ConnectWithContext(ctx
context.Context) <-chan plc4go.PlcCo
}
m.setupConnection(ctx, ch)
- }()
+ })
return ch
}
@@ -178,9 +176,7 @@ func (m *Connection) setupConnection(ctx context.Context,
ch chan plc4go.PlcConn
// Start the worker for handling incoming messages
// (Messages that are not responses to outgoing messages)
defaultIncomingMessageChannel :=
m.messageCodec.GetDefaultIncomingMessageChannel()
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
m.log.Error().
@@ -206,7 +202,7 @@ func (m *Connection) setupConnection(ctx context.Context,
ch chan plc4go.PlcConn
}
}
m.log.Info().Msg("Done waiting for messages ...")
- }()
+ })
// Subscribe for changes to the symbol or the offline-versions
versionChangeRequest, err := m.SubscriptionRequestBuilder().
diff --git a/plc4go/internal/ads/Discoverer.go
b/plc4go/internal/ads/Discoverer.go
index bd525dbaff..b8c33c3838 100644
--- a/plc4go/internal/ads/Discoverer.go
+++ b/plc4go/internal/ads/Discoverer.go
@@ -167,9 +167,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback
func(event apiModel.
discoveryItem.socket = socket
// Start a worker to receive responses
- d.wg.Add(1)
- go func(discoveryItem *discovery) {
- defer d.wg.Done()
+ d.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
d.log.Error().
@@ -271,7 +269,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback
func(event apiModel.
callback(plcDiscoveryItem)
}
}
- }(discoveryItem)
+ })
}
defer func() {
for _, discoveryItem := range discoveryItems {
diff --git a/plc4go/internal/ads/Interactions.go
b/plc4go/internal/ads/Interactions.go
index 25dee5805c..d578ee21bb 100644
--- a/plc4go/internal/ads/Interactions.go
+++ b/plc4go/internal/ads/Interactions.go
@@ -32,9 +32,7 @@ import (
func (m *Connection) ExecuteAdsReadDeviceInfoRequest(ctx context.Context)
(model.AdsReadDeviceInfoResponse, error) {
responseChannel := make(chan model.AdsReadDeviceInfoResponse, 1)
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
m.log.Error().
@@ -67,7 +65,7 @@ func (m *Connection) ExecuteAdsReadDeviceInfoRequest(ctx
context.Context) (model
m.log.Debug().Err(err).Msg("error during send request")
close(responseChannel)
}
- }()
+ })
response, err := ReadWithTimeout(ctx, responseChannel)
if err != nil {
return nil, fmt.Errorf("error reading device info: %v", err)
@@ -77,9 +75,7 @@ func (m *Connection) ExecuteAdsReadDeviceInfoRequest(ctx
context.Context) (model
func (m *Connection) ExecuteAdsReadRequest(ctx context.Context, indexGroup
uint32, indexOffset uint32, length uint32) (model.AdsReadResponse, error) {
responseChannel := make(chan model.AdsReadResponse, 1)
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
m.log.Error().
@@ -112,7 +108,7 @@ func (m *Connection) ExecuteAdsReadRequest(ctx
context.Context, indexGroup uint3
m.log.Debug().Err(err).Msg("error during send request")
close(responseChannel)
}
- }()
+ })
response, err := ReadWithTimeout(ctx, responseChannel)
if err != nil {
return nil, fmt.Errorf("error reading: %v", err)
@@ -122,9 +118,7 @@ func (m *Connection) ExecuteAdsReadRequest(ctx
context.Context, indexGroup uint3
func (m *Connection) ExecuteAdsWriteRequest(ctx context.Context, indexGroup
uint32, indexOffset uint32, data []byte) (model.AdsWriteResponse, error) {
responseChannel := make(chan model.AdsWriteResponse, 1)
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
m.log.Error().
@@ -157,7 +151,7 @@ func (m *Connection) ExecuteAdsWriteRequest(ctx
context.Context, indexGroup uint
m.log.Debug().Err(err).Msg("error during send request")
close(responseChannel)
}
- }()
+ })
response, err := ReadWithTimeout(ctx, responseChannel)
if err != nil {
return nil, fmt.Errorf("error writing: %v", err)
@@ -167,9 +161,7 @@ func (m *Connection) ExecuteAdsWriteRequest(ctx
context.Context, indexGroup uint
func (m *Connection) ExecuteAdsReadWriteRequest(ctx context.Context,
indexGroup uint32, indexOffset uint32, readLength uint32, items
[]model.AdsMultiRequestItem, writeData []byte) (model.AdsReadWriteResponse,
error) {
responseChannel := make(chan model.AdsReadWriteResponse, 1)
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
m.log.Error().
@@ -202,7 +194,7 @@ func (m *Connection) ExecuteAdsReadWriteRequest(ctx
context.Context, indexGroup
m.log.Debug().Err(err).Msg("error during send request")
close(responseChannel)
}
- }()
+ })
response, err := ReadWithTimeout(ctx, responseChannel)
if err != nil {
return nil, fmt.Errorf("error writing: %v", err)
@@ -212,9 +204,7 @@ func (m *Connection) ExecuteAdsReadWriteRequest(ctx
context.Context, indexGroup
func (m *Connection) ExecuteAdsAddDeviceNotificationRequest(ctx
context.Context, indexGroup uint32, indexOffset uint32, length uint32,
transmissionMode model.AdsTransMode, maxDelay uint32, cycleTime uint32)
(model.AdsAddDeviceNotificationResponse, error) {
responseChannel := make(chan model.AdsAddDeviceNotificationResponse, 1)
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
m.log.Error().
@@ -247,7 +237,7 @@ func (m *Connection)
ExecuteAdsAddDeviceNotificationRequest(ctx context.Context,
m.log.Debug().Err(err).Msg("error during send request")
close(responseChannel)
}
- }()
+ })
response, err := ReadWithTimeout(ctx, responseChannel)
if err != nil {
return nil, fmt.Errorf("error writing: %v", err)
@@ -257,9 +247,7 @@ func (m *Connection)
ExecuteAdsAddDeviceNotificationRequest(ctx context.Context,
func (m *Connection) ExecuteAdsDeleteDeviceNotificationRequest(ctx
context.Context, notificationHandle uint32)
(model.AdsDeleteDeviceNotificationResponse, error) {
responseChannel := make(chan model.AdsDeleteDeviceNotificationResponse,
1)
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
m.log.Error().
@@ -292,7 +280,7 @@ func (m *Connection)
ExecuteAdsDeleteDeviceNotificationRequest(ctx context.Conte
m.log.Debug().Err(err).Msg("error during send request")
close(responseChannel)
}
- }()
+ })
response, err := ReadWithTimeout(ctx, responseChannel)
if err != nil {
return nil, fmt.Errorf("error writing: %v", err)
diff --git a/plc4go/internal/ads/Reader.go b/plc4go/internal/ads/Reader.go
index 97e0971714..2888c185bb 100644
--- a/plc4go/internal/ads/Reader.go
+++ b/plc4go/internal/ads/Reader.go
@@ -44,9 +44,7 @@ func (m *Connection) ReadRequestBuilder()
apiModel.PlcReadRequestBuilder {
func (m *Connection) Read(ctx context.Context, readRequest
apiModel.PlcReadRequest) <-chan apiModel.PlcReadRequestResult {
m.log.Trace().Msg("Reading")
result := make(chan apiModel.PlcReadRequestResult, 1)
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
result <-
spiModel.NewDefaultPlcReadRequestResult(readRequest, nil,
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -57,7 +55,7 @@ func (m *Connection) Read(ctx context.Context, readRequest
apiModel.PlcReadReque
} else {
m.multiRead(ctx, readRequest, result)
}
- }()
+ })
return result
}
@@ -97,9 +95,7 @@ func (m *Connection) singleRead(ctx context.Context,
readRequest apiModel.PlcRea
return
}
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
result <-
spiModel.NewDefaultPlcReadRequestResult(readRequest, nil,
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -140,7 +136,7 @@ func (m *Connection) singleRead(ctx context.Context,
readRequest apiModel.PlcRea
spiModel.NewDefaultPlcReadResponse(readRequest,
responseCodes, plcValues),
nil,
)
- }()
+ })
}
func (m *Connection) multiRead(ctx context.Context, readRequest
apiModel.PlcReadRequest, result chan apiModel.PlcReadRequestResult) {
diff --git a/plc4go/internal/ads/Subscriber.go
b/plc4go/internal/ads/Subscriber.go
index 2a71df4873..8ab5f0ffae 100644
--- a/plc4go/internal/ads/Subscriber.go
+++ b/plc4go/internal/ads/Subscriber.go
@@ -101,9 +101,7 @@ func (m *Connection) Subscribe(ctx context.Context,
subscriptionRequest apiModel
// Create a new result-channel, which completes as soon as all
sub-result-channels have returned
globalResultChannel := make(chan apiModel.PlcSubscriptionRequestResult,
1)
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
m.log.Error().
@@ -128,16 +126,14 @@ func (m *Connection) Subscribe(ctx context.Context,
subscriptionRequest apiModel
result := m.processSubscriptionResponses(ctx,
subscriptionRequest, subResults)
// Return the final result
globalResultChannel <- result
- }()
+ })
return globalResultChannel
}
func (m *Connection) subscribe(ctx context.Context, subscriptionRequest
apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
responseChan := make(chan apiModel.PlcSubscriptionRequestResult, 1)
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
responseChan <-
spiModel.NewDefaultPlcSubscriptionRequestResult(subscriptionRequest, nil,
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -177,7 +173,7 @@ func (m *Connection) subscribe(ctx context.Context,
subscriptionRequest apiModel
)
// Store it together with the returned ADS handle.
m.subscriptions[response.GetNotificationHandle()] =
subscriptionHandle
- }()
+ })
return responseChan
}
diff --git a/plc4go/internal/ads/Writer.go b/plc4go/internal/ads/Writer.go
index 35e6cfc778..025c8d9ce6 100644
--- a/plc4go/internal/ads/Writer.go
+++ b/plc4go/internal/ads/Writer.go
@@ -43,9 +43,7 @@ func (m *Connection) WriteRequestBuilder()
apiModel.PlcWriteRequestBuilder {
func (m *Connection) Write(ctx context.Context, writeRequest
apiModel.PlcWriteRequest) <-chan apiModel.PlcWriteRequestResult {
m.log.Trace().Msg("Writing")
result := make(chan apiModel.PlcWriteRequestResult, 1)
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
result <-
spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil,
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -56,7 +54,7 @@ func (m *Connection) Write(ctx context.Context, writeRequest
apiModel.PlcWriteRe
} else {
m.multiWrite(ctx, writeRequest, result)
}
- }()
+ })
return result
}
@@ -106,9 +104,7 @@ func (m *Connection) singleWrite(ctx context.Context,
writeRequest apiModel.PlcW
}
data := io.GetBytes()
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
result <-
spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil,
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -133,7 +129,7 @@ func (m *Connection) singleWrite(ctx context.Context,
writeRequest apiModel.PlcW
}
// Return the response to the caller.
result <-
spiModel.NewDefaultPlcWriteRequestResult(writeRequest,
spiModel.NewDefaultPlcWriteResponse(writeRequest, responseCodes), nil)
- }()
+ })
}
func (m *Connection) multiWrite(ctx context.Context, writeRequest
apiModel.PlcWriteRequest, result chan apiModel.PlcWriteRequestResult) {
diff --git a/plc4go/internal/bacnetip/ApplicationLayerMessageCodec.go
b/plc4go/internal/bacnetip/ApplicationLayerMessageCodec.go
index a96285b819..9ca3f3375c 100644
--- a/plc4go/internal/bacnetip/ApplicationLayerMessageCodec.go
+++ b/plc4go/internal/bacnetip/ApplicationLayerMessageCodec.go
@@ -128,16 +128,12 @@ func (m *ApplicationLayerMessageCodec) Send(message
spi.Message) error {
if err != nil {
return errors.Wrap(err, "error creating IOCB")
}
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
+ m.wg.Go(func() {
if err := m.bipSimpleApplication.RequestIO(iocb); err
!= nil {
m.log.Debug().Err(err).Msg("errored")
}
- }()
+ })
iocb.Wait()
if err := iocb.GetIOError(); err != nil {
// TODO: handle error
@@ -148,7 +144,7 @@ func (m *ApplicationLayerMessageCodec) Send(message
spi.Message) error {
} else {
// TODO: what now?
}
- }()
+ })
return nil
}
@@ -166,16 +162,12 @@ func (m *ApplicationLayerMessageCodec) SendRequest(ctx
context.Context, message
if err != nil {
return errors.Wrap(err, "error creating IOCB")
}
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
+ m.wg.Go(func() {
if err := m.bipSimpleApplication.RequestIO(iocb); err
!= nil {
m.log.Error().Err(err).Msg("errored")
}
- }()
+ })
iocb.Wait()
if err := iocb.GetIOError(); err != nil {
if err := handleError(err); err != nil {
@@ -216,7 +208,7 @@ func (m *ApplicationLayerMessageCodec) SendRequest(ctx
context.Context, message
} else {
// TODO: what now?
}
- }()
+ })
return nil
}
diff --git a/plc4go/internal/bacnetip/Connection.go
b/plc4go/internal/bacnetip/Connection.go
index 26db0bfeae..6e5af988de 100644
--- a/plc4go/internal/bacnetip/Connection.go
+++ b/plc4go/internal/bacnetip/Connection.go
@@ -91,18 +91,14 @@ func (c *Connection) GetTracer() tracer.Tracer {
func (c *Connection) ConnectWithContext(ctx context.Context) <-chan
plc4go.PlcConnectionConnectResult {
c.log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
+ c.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
ch <-
_default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v.
Stack: %s", err, debug.Stack()))
}
}()
connectionConnectResult :=
<-c.DefaultConnection.ConnectWithContext(ctx)
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
+ c.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
ch <-
_default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v.
Stack: %s", err, debug.Stack()))
@@ -113,9 +109,9 @@ func (c *Connection) ConnectWithContext(ctx
context.Context) <-chan plc4go.PlcCo
c.passToDefaultIncomingMessageChannel()
}
c.log.Info().Msg("Ending incoming message transfer")
- }()
+ })
ch <- connectionConnectResult
- }()
+ })
return ch
}
diff --git a/plc4go/internal/bacnetip/Discoverer.go
b/plc4go/internal/bacnetip/Discoverer.go
index 7c52d29e83..8da8336889 100644
--- a/plc4go/internal/bacnetip/Discoverer.go
+++ b/plc4go/internal/bacnetip/Discoverer.go
@@ -167,16 +167,14 @@ func (d *Discoverer) broadcastAndDiscover(ctx
context.Context, communicationChan
}
}
- go func(communicationChannelInstance communicationChannel) {
+ d.wg.Go(func() {
for {
if err := ctx.Err(); err != nil {
d.log.Debug().Err(err).Msg("ending")
return
}
blockingReadChan := make(chan bool)
- d.wg.Add(1)
- go func() {
- defer d.wg.Done()
+ d.wg.Go(func() {
buf := make([]byte, 4096)
n, addr, err :=
communicationChannelInstance.unicastConnection.ReadFrom(buf)
if err != nil {
@@ -194,7 +192,7 @@ func (d *Discoverer) broadcastAndDiscover(ctx
context.Context, communicationChan
}
incomingBVLCChannel <-
receivedBvlcMessage{incomingBvlc, addr}
blockingReadChan <- true
- }()
+ })
select {
case ok := <-blockingReadChan:
if !ok {
@@ -207,18 +205,16 @@ func (d *Discoverer) broadcastAndDiscover(ctx
context.Context, communicationChan
return
}
}
- }(communicationChannelInstance)
+ })
- go func(communicationChannelInstance communicationChannel) {
+ d.wg.Go(func() {
for {
if err := ctx.Err(); err != nil {
d.log.Debug().Err(err).Msg("ending")
return
}
blockingReadChan := make(chan bool)
- d.wg.Add(1)
- go func() {
- defer d.wg.Done()
+ d.wg.Go(func() {
buf := make([]byte, 4096)
n, addr, err :=
communicationChannelInstance.broadcastConnection.ReadFrom(buf)
if err != nil {
@@ -235,7 +231,7 @@ func (d *Discoverer) broadcastAndDiscover(ctx
context.Context, communicationChan
}
incomingBVLCChannel <-
receivedBvlcMessage{incomingBvlc, addr}
blockingReadChan <- true
- }()
+ })
select {
case ok := <-blockingReadChan:
if !ok {
@@ -248,7 +244,7 @@ func (d *Discoverer) broadcastAndDiscover(ctx
context.Context, communicationChan
return
}
}
- }(communicationChannelInstance)
+ })
}
return incomingBVLCChannel, nil
}
diff --git a/plc4go/internal/bacnetip/Reader.go
b/plc4go/internal/bacnetip/Reader.go
index 447fc727e9..467fc03dba 100644
--- a/plc4go/internal/bacnetip/Reader.go
+++ b/plc4go/internal/bacnetip/Reader.go
@@ -69,9 +69,7 @@ func (m *Reader) Read(ctx context.Context, readRequest
apiModel.PlcReadRequest)
// TODO: handle ctx
m.log.Trace().Msg("Reading")
result := make(chan apiModel.PlcReadRequestResult, 1)
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
if len(readRequest.GetTagNames()) == 0 {
result <-
spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, errors.New("at least
one field required"))
return
@@ -138,7 +136,9 @@ func (m *Reader) Read(ctx context.Context, readRequest
apiModel.PlcReadRequest)
// Start a new request-transaction (Is ended in the
response-handler)
transaction := m.tm.StartTransaction()
- transaction.Submit(func(transaction
transactions.RequestTransaction) {
+ transaction.Submit(func(transactionContext context.Context,
transaction transactions.RequestTransaction) {
+ ctx, cancel := context.WithCancel(ctx)
+ context.AfterFunc(transactionContext, cancel)
// Send the over the wire
m.log.Trace().Msg("Send ")
if err := m.messageCodec.SendRequest(ctx, apdu,
func(message spi.Message) bool {
@@ -206,7 +206,7 @@ func (m *Reader) Read(ctx context.Context, readRequest
apiModel.PlcReadRequest)
}
}
})
- }()
+ })
return result
}
diff --git a/plc4go/internal/bacnetip/Subscriber.go
b/plc4go/internal/bacnetip/Subscriber.go
index b95913219d..280ae2fae6 100644
--- a/plc4go/internal/bacnetip/Subscriber.go
+++ b/plc4go/internal/bacnetip/Subscriber.go
@@ -55,9 +55,7 @@ func NewSubscriber(connection *Connection, _options
...options.WithOption) *Subs
func (m *Subscriber) Subscribe(ctx context.Context, subscriptionRequest
apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
result := make(chan apiModel.PlcSubscriptionRequestResult, 1)
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
internalPlcSubscriptionRequest :=
subscriptionRequest.(*spiModel.DefaultPlcSubscriptionRequest)
// Add this subscriber to the connection.
@@ -85,7 +83,7 @@ func (m *Subscriber) Subscribe(ctx context.Context,
subscriptionRequest apiModel
),
nil,
)
- }()
+ })
return result
}
diff --git a/plc4go/internal/bacnetip/bacgopes/core/core.go
b/plc4go/internal/bacnetip/bacgopes/core/core.go
index bcec235c46..c7b29c1b7e 100644
--- a/plc4go/internal/bacnetip/bacgopes/core/core.go
+++ b/plc4go/internal/bacnetip/bacgopes/core/core.go
@@ -51,18 +51,15 @@ var wg sync.WaitGroup // use to track spawned go routines
func run() {
running = true
- wg.Add(1)
- go func() {
+ wg.Go(func() {
defer wg.Done()
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-c
running = false
- }()
- wg.Add(1)
- go func() {
- defer wg.Done()
+ })
+ wg.Go(func() {
for running {
// get the next task
var delta time.Duration
@@ -112,7 +109,7 @@ func run() {
}
}
}
- }()
+ })
}
// RunOnce makes a pass through the scheduled tasks and deferred functions just
diff --git a/plc4go/internal/bacnetip/bacgopes/iocb/iocb_IOCB.go
b/plc4go/internal/bacnetip/bacgopes/iocb/iocb_IOCB.go
index 5efdffb02c..d8ffd151ff 100644
--- a/plc4go/internal/bacnetip/bacgopes/iocb/iocb_IOCB.go
+++ b/plc4go/internal/bacnetip/bacgopes/iocb/iocb_IOCB.go
@@ -251,15 +251,13 @@ func (i *IOCB) SetTimeout(delay time.Duration) {
now := GetTaskManagerTime()
i.ioTimeout = time.NewTimer(delay)
i.ioTimoutCancel = make(chan struct{})
- i.wg.Add(1)
- go func() {
- defer i.wg.Done()
+ i.wg.Go(func() {
select {
case timeout := <-i.ioTimeout.C:
_ =
i.Abort(utils.NewTimeoutError(now.Sub(timeout)))
case <-i.ioTimoutCancel:
}
- }()
+ })
}
}
diff --git a/plc4go/internal/bacnetip/bacgopes/iocb/iocb_IOQueue.go
b/plc4go/internal/bacnetip/bacgopes/iocb/iocb_IOQueue.go
index 440e7afc26..76f9720cd3 100644
--- a/plc4go/internal/bacnetip/bacgopes/iocb/iocb_IOQueue.go
+++ b/plc4go/internal/bacnetip/bacgopes/iocb/iocb_IOQueue.go
@@ -87,12 +87,10 @@ func (i *IOQueue) Get(block bool, delay *time.Duration)
(IOCBContract, error) {
if len(i.Queue) == 0 {
if delay != nil {
gotSomething := make(chan struct{})
- i.wg.Add(1)
- go func() {
- defer i.wg.Done()
+ i.wg.Go(func() {
i.notEmpty.Wait()
close(gotSomething)
- }()
+ })
timeout := time.NewTimer(*delay)
select {
case <-gotSomething:
diff --git a/plc4go/internal/bacnetip/bacgopes/udp/udp_UDPDirector.go
b/plc4go/internal/bacnetip/bacgopes/udp/udp_UDPDirector.go
index e5188ba5b0..ce4c9dd6f9 100644
--- a/plc4go/internal/bacnetip/bacgopes/udp/udp_UDPDirector.go
+++ b/plc4go/internal/bacnetip/bacgopes/udp/udp_UDPDirector.go
@@ -96,19 +96,15 @@ func NewUDPDirector(localLog zerolog.Logger, address
AddressTuple[string, uint16
}
d.running = true
- d.wg.Add(1)
- go func() {
- defer d.wg.Done()
+ d.wg.Go(func() {
for d.running {
d.handleRead()
}
- }()
+ })
// create the request queue
d.request = make(chan PDU)
- d.wg.Add(1)
- go func() {
- defer d.wg.Done()
+ d.wg.Go(func() {
for d.running {
pdu := <-d.request
serialize, err := pdu.GetRootMessage().Serialize()
@@ -131,7 +127,7 @@ func NewUDPDirector(localLog zerolog.Logger, address
AddressTuple[string, uint16
}
localLog.Debug().Int("writtenBytes",
writtenBytes).Msg("written bytes")
}
- }()
+ })
// start with an empty peer pool
d.peers = map[string]*UDPActor{}
@@ -226,13 +222,11 @@ func (d *UDPDirector) handleRead() {
}
pdu := NewCPDU(readBytes, NKW(KWCPCISource, saddr, KWCPCIDestination,
daddr), WithRootMessage(bvlc)) // TODO: why do we set the destination here???
This might be completely wrong
// send the _PDU up to the client
- d.wg.Add(1)
- go func() {
- defer d.wg.Done()
+ d.wg.Go(func() {
if err := d._response(pdu); err != nil {
d.log.Debug().Err(err).Msg("errored")
}
- }()
+ })
}
func (d *UDPDirector) writeable() {
diff --git a/plc4go/internal/cbus/Browser.go b/plc4go/internal/cbus/Browser.go
index 689348d8ff..85a3f73b38 100644
--- a/plc4go/internal/cbus/Browser.go
+++ b/plc4go/internal/cbus/Browser.go
@@ -339,9 +339,7 @@ func (m *Browser) getInstalledUnitAddressBytes(ctx
context.Context) (map[byte]an
readCtx, readCtxCancel := context.WithTimeout(ctx, 2*time.Second)
defer readCtxCancel()
readWg := new(sync.WaitGroup)
- readWg.Add(1)
- go func() {
- defer readWg.Done()
+ readWg.Go(func() {
defer func() {
if err := recover(); err != nil {
m.log.Error().
@@ -431,7 +429,7 @@ func (m *Browser) getInstalledUnitAddressBytes(ctx
context.Context) (map[byte]an
Stringer("responseCode", responseCode).
Msg("We got responseCode as response code for
installation mmi so we rely on getting it via subscription")
}
- }()
+ })
syncCtx, syncCtxCancel := context.WithTimeout(ctx, 6*time.Second)
defer syncCtxCancel()
diff --git a/plc4go/internal/cbus/CBusMessageMapper_test.go
b/plc4go/internal/cbus/CBusMessageMapper_test.go
index 31833fe0e8..f542c78d73 100644
--- a/plc4go/internal/cbus/CBusMessageMapper_test.go
+++ b/plc4go/internal/cbus/CBusMessageMapper_test.go
@@ -20,6 +20,7 @@
package cbus
import (
+ "context"
"fmt"
"testing"
@@ -1424,7 +1425,7 @@ func TestMapEncodedReply(t *testing.T) {
})
transaction :=
transactionManager.StartTransaction()
t.Logf("Submitting No-Op to transaction\n%v",
transaction)
- transaction.Submit(func(transaction
transactions.RequestTransaction) {
+ transaction.Submit(func(context.Context,
transactions.RequestTransaction) {
// NO-OP
})
args.transaction = transaction
@@ -1501,7 +1502,7 @@ func TestMapEncodedReply(t *testing.T) {
})
transaction :=
transactionManager.StartTransaction()
t.Logf("Submitting No-Op to transaction %v",
transaction)
- transaction.Submit(func(transaction
transactions.RequestTransaction) {
+ transaction.Submit(func(context.Context,
transactions.RequestTransaction) {
// NO-OP
t.Log("No op-ing")
})
@@ -1594,7 +1595,7 @@ func TestMapEncodedReply(t *testing.T) {
assert.NoError(t,
transactionManager.Close())
})
transaction :=
transactionManager.StartTransaction()
- transaction.Submit(func(transaction
transactions.RequestTransaction) {
+ transaction.Submit(func(context.Context,
transactions.RequestTransaction) {
// NO-OP
})
args.transaction = transaction
@@ -1677,7 +1678,7 @@ func TestMapEncodedReply(t *testing.T) {
assert.NoError(t,
transactionManager.Close())
})
transaction :=
transactionManager.StartTransaction()
- transaction.Submit(func(transaction
transactions.RequestTransaction) {
+ transaction.Submit(func(context.Context,
transactions.RequestTransaction) {
// NO-OP
})
args.transaction = transaction
@@ -1743,7 +1744,7 @@ func TestMapEncodedReply(t *testing.T) {
assert.NoError(t,
transactionManager.Close())
})
transaction :=
transactionManager.StartTransaction()
- transaction.Submit(func(transaction
transactions.RequestTransaction) {
+ transaction.Submit(func(context.Context,
transactions.RequestTransaction) {
// NO-OP
})
args.transaction = transaction
@@ -1812,7 +1813,7 @@ func TestMapEncodedReply(t *testing.T) {
assert.NoError(t,
transactionManager.Close())
})
transaction :=
transactionManager.StartTransaction()
- transaction.Submit(func(transaction
transactions.RequestTransaction) {
+ transaction.Submit(func(context.Context,
transactions.RequestTransaction) {
// NO-OP
})
args.transaction = transaction
@@ -1900,7 +1901,7 @@ func TestMapEncodedReply(t *testing.T) {
assert.NoError(t,
transactionManager.Close())
})
transaction :=
transactionManager.StartTransaction()
- transaction.Submit(func(transaction
transactions.RequestTransaction) {
+ transaction.Submit(func(context.Context,
transactions.RequestTransaction) {
// NO-OP
})
args.transaction = transaction
@@ -2006,7 +2007,7 @@ func TestMapEncodedReply(t *testing.T) {
assert.NoError(t,
transactionManager.Close())
})
transaction :=
transactionManager.StartTransaction()
- transaction.Submit(func(transaction
transactions.RequestTransaction) {
+ transaction.Submit(func(context.Context,
transactions.RequestTransaction) {
// NO-OP
})
args.transaction = transaction
@@ -2076,7 +2077,7 @@ func TestMapEncodedReply(t *testing.T) {
assert.NoError(t,
transactionManager.Close())
})
transaction :=
transactionManager.StartTransaction()
- transaction.Submit(func(transaction
transactions.RequestTransaction) {
+ transaction.Submit(func(context.Context,
transactions.RequestTransaction) {
// NO-OP
})
args.transaction = transaction
@@ -2142,7 +2143,7 @@ func TestMapEncodedReply(t *testing.T) {
assert.NoError(t,
transactionManager.Close())
})
transaction :=
transactionManager.StartTransaction()
- transaction.Submit(func(transaction
transactions.RequestTransaction) {
+ transaction.Submit(func(context.Context,
transactions.RequestTransaction) {
// NO-OP
})
args.transaction = transaction
@@ -2208,7 +2209,7 @@ func TestMapEncodedReply(t *testing.T) {
assert.NoError(t,
transactionManager.Close())
})
transaction :=
transactionManager.StartTransaction()
- transaction.Submit(func(transaction
transactions.RequestTransaction) {
+ transaction.Submit(func(context.Context,
transactions.RequestTransaction) {
// NO-OP
})
args.transaction = transaction
@@ -2274,7 +2275,7 @@ func TestMapEncodedReply(t *testing.T) {
assert.NoError(t,
transactionManager.Close())
})
transaction :=
transactionManager.StartTransaction()
- transaction.Submit(func(transaction
transactions.RequestTransaction) {
+ transaction.Submit(func(context.Context,
transactions.RequestTransaction) {
// NO-OP
})
args.transaction = transaction
@@ -2340,7 +2341,7 @@ func TestMapEncodedReply(t *testing.T) {
assert.NoError(t,
transactionManager.Close())
})
transaction :=
transactionManager.StartTransaction()
- transaction.Submit(func(transaction
transactions.RequestTransaction) {
+ transaction.Submit(func(context.Context,
transactions.RequestTransaction) {
// NO-OP
})
args.transaction = transaction
@@ -2426,7 +2427,7 @@ func TestMapEncodedReply(t *testing.T) {
assert.NoError(t,
transactionManager.Close())
})
transaction :=
transactionManager.StartTransaction()
- transaction.Submit(func(transaction
transactions.RequestTransaction) {
+ transaction.Submit(func(context.Context,
transactions.RequestTransaction) {
// NO-OP
})
args.transaction = transaction
@@ -2492,7 +2493,7 @@ func TestMapEncodedReply(t *testing.T) {
assert.NoError(t,
transactionManager.Close())
})
transaction :=
transactionManager.StartTransaction()
- transaction.Submit(func(transaction
transactions.RequestTransaction) {
+ transaction.Submit(func(context.Context,
transactions.RequestTransaction) {
// NO-OP
})
args.transaction = transaction
@@ -2558,7 +2559,7 @@ func TestMapEncodedReply(t *testing.T) {
assert.NoError(t,
transactionManager.Close())
})
transaction :=
transactionManager.StartTransaction()
- transaction.Submit(func(transaction
transactions.RequestTransaction) {
+ transaction.Submit(func(context.Context,
transactions.RequestTransaction) {
// NO-OP
})
args.transaction = transaction
@@ -2624,7 +2625,7 @@ func TestMapEncodedReply(t *testing.T) {
assert.NoError(t,
transactionManager.Close())
})
transaction :=
transactionManager.StartTransaction()
- transaction.Submit(func(transaction
transactions.RequestTransaction) {
+ transaction.Submit(func(context.Context,
transactions.RequestTransaction) {
// NO-OP
})
args.transaction = transaction
@@ -2690,7 +2691,7 @@ func TestMapEncodedReply(t *testing.T) {
assert.NoError(t,
transactionManager.Close())
})
transaction :=
transactionManager.StartTransaction()
- transaction.Submit(func(transaction
transactions.RequestTransaction) {
+ transaction.Submit(func(context.Context,
transactions.RequestTransaction) {
// NO-OP
})
args.transaction = transaction
@@ -2756,7 +2757,7 @@ func TestMapEncodedReply(t *testing.T) {
assert.NoError(t,
transactionManager.Close())
})
transaction :=
transactionManager.StartTransaction()
- transaction.Submit(func(transaction
transactions.RequestTransaction) {
+ transaction.Submit(func(context.Context,
transactions.RequestTransaction) {
// NO-OP
})
args.transaction = transaction
@@ -2838,7 +2839,7 @@ func TestMapEncodedReply(t *testing.T) {
assert.NoError(t,
transactionManager.Close())
})
transaction :=
transactionManager.StartTransaction()
- transaction.Submit(func(transaction
transactions.RequestTransaction) {
+ transaction.Submit(func(context.Context,
transactions.RequestTransaction) {
// NO-OP
})
args.transaction = transaction
@@ -2904,7 +2905,7 @@ func TestMapEncodedReply(t *testing.T) {
assert.NoError(t,
transactionManager.Close())
})
transaction :=
transactionManager.StartTransaction()
- transaction.Submit(func(transaction
transactions.RequestTransaction) {
+ transaction.Submit(func(context.Context,
transactions.RequestTransaction) {
// NO-OP
})
args.transaction = transaction
@@ -2970,7 +2971,7 @@ func TestMapEncodedReply(t *testing.T) {
assert.NoError(t,
transactionManager.Close())
})
transaction :=
transactionManager.StartTransaction()
- transaction.Submit(func(transaction
transactions.RequestTransaction) {
+ transaction.Submit(func(context.Context,
transactions.RequestTransaction) {
// NO-OP
})
args.transaction = transaction
diff --git a/plc4go/internal/cbus/Connection.go
b/plc4go/internal/cbus/Connection.go
index d2ae5a5378..b16248c280 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -129,9 +129,7 @@ func (c *Connection) GetMessageCodec() spi.MessageCodec {
func (c *Connection) ConnectWithContext(ctx context.Context) <-chan
plc4go.PlcConnectionConnectResult {
c.log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
+ c.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
c.fireConnectionError(errors.Errorf("panic-ed
%v. Stack:\n%s", err, debug.Stack()), ch)
@@ -154,21 +152,19 @@ func (c *Connection) ConnectWithContext(ctx
context.Context) <-chan plc4go.PlcCo
}
c.setupConnection(ctx, ch)
- }()
+ })
return ch
}
func (c *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
results := make(chan plc4go.PlcConnectionCloseResult, 1)
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
+ c.wg.Go(func() {
result := <-c.DefaultConnection.Close()
c.log.Trace().Msg("Waiting for handlers to stop")
c.handlerWaitGroup.Wait()
c.log.Trace().Msg("handlers stopped, dispatching result")
results <- result
- }()
+ })
return results
}
@@ -271,12 +267,8 @@ func (c *Connection) setupConnection(ctx context.Context,
ch chan plc4go.PlcConn
func (c *Connection) startSubscriptionHandler() {
c.log.Debug().Msg("Starting SAL handler")
- c.handlerWaitGroup.Add(1)
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
+ c.handlerWaitGroup.Go(func() {
salLogger := c.log.With().Str("handlerType", "SAL").Logger()
- defer c.handlerWaitGroup.Done()
defer func() {
if err := recover(); err != nil {
salLogger.Error().
@@ -313,14 +305,10 @@ func (c *Connection) startSubscriptionHandler() {
}
}
salLogger.Info().Msg("handler ended")
- }()
+ })
c.log.Debug().Msg("Starting MMI handler")
- c.handlerWaitGroup.Add(1)
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
+ c.handlerWaitGroup.Go(func() {
mmiLogger := c.log.With().Str("handlerType", "MMI").Logger()
- defer c.handlerWaitGroup.Done()
defer func() {
if err := recover(); err != nil {
mmiLogger.Error().
@@ -352,7 +340,7 @@ func (c *Connection) startSubscriptionHandler() {
}
}
mmiLogger.Info().Msg("handler ended")
- }()
+ })
}
func (c *Connection) sendReset(ctx context.Context, ch chan
plc4go.PlcConnectionConnectResult, cbusOptions *readWriteModel.CBusOptions,
requestContext *readWriteModel.RequestContext, sendOutErrorNotification bool)
(ok bool) {
diff --git a/plc4go/internal/cbus/Connection_test.go
b/plc4go/internal/cbus/Connection_test.go
index f77bbd3c57..cea1bfd9da 100644
--- a/plc4go/internal/cbus/Connection_test.go
+++ b/plc4go/internal/cbus/Connection_test.go
@@ -1685,10 +1685,8 @@ func TestConnection_startSubscriptionHandler(t
*testing.T) {
codec.monitoredMMIs = make(chan
readWriteModel.CALReply, 1)
codec.monitoredSALs = make(chan
readWriteModel.MonitoredSAL, 1)
dispatchWg := new(sync.WaitGroup)
- dispatchWg.Add(1)
t.Cleanup(dispatchWg.Wait)
- go func() {
- defer dispatchWg.Done()
+ dispatchWg.Go(func() {
codec.monitoredMMIs <-
readWriteModel.NewCALReplyShort(0, nil)
codec.monitoredSALs <-
readWriteModel.NewMonitoredSALShortFormBasicMode(
0,
@@ -1699,7 +1697,7 @@ func TestConnection_startSubscriptionHandler(t
*testing.T) {
readWriteModel.ApplicationIdContainer_ACCESS_CONTROL_D5,
nil,
)
- }()
+ })
t.Cleanup(func() {
assert.NoError(t, codec.Disconnect())
})
@@ -1718,10 +1716,8 @@ func TestConnection_startSubscriptionHandler(t
*testing.T) {
codec := NewMessageCodec(nil, _options...)
written := make(chan struct{})
dispatchWg := new(sync.WaitGroup)
- dispatchWg.Add(1)
t.Cleanup(dispatchWg.Wait)
- go func() {
- defer dispatchWg.Done()
+ dispatchWg.Go(func() {
codec.monitoredMMIs <-
readWriteModel.NewCALReplyShort(0, nil)
codec.monitoredSALs <-
readWriteModel.NewMonitoredSALShortFormBasicMode(
0,
@@ -1733,7 +1729,7 @@ func TestConnection_startSubscriptionHandler(t
*testing.T) {
nil,
)
close(written)
- }()
+ })
t.Cleanup(func() {
<-written
})
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index ed7dc46e28..75a5eac2e8 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -128,7 +128,9 @@ func (m *Reader) readSync(ctx context.Context, readRequest
apiModel.PlcReadReque
func (m *Reader) createMessageTransactionAndWait(ctx context.Context,
messageToSend readWriteModel.CBusMessage, addResponseCode func(name string,
responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name
string, plcValue apiValues.PlcValue)) {
// Start a new request-transaction (Is ended in the response-handler)
transaction := m.tm.StartTransaction()
- transaction.Submit(func(transaction transactions.RequestTransaction) {
+ transaction.Submit(func(transactionContext context.Context, transaction
transactions.RequestTransaction) {
+ ctx, cancel := context.WithCancel(ctx)
+ context.AfterFunc(transactionContext, cancel)
m.log.Trace().Stringer("transaction",
transaction).Msg("Transaction getting handled")
m.sendMessageOverTheWire(ctx, transaction, messageToSend,
addResponseCode, tagName, addPlcValue)
})
diff --git a/plc4go/internal/cbus/Subscriber.go
b/plc4go/internal/cbus/Subscriber.go
index 939858b5c8..73801431c6 100644
--- a/plc4go/internal/cbus/Subscriber.go
+++ b/plc4go/internal/cbus/Subscriber.go
@@ -64,9 +64,7 @@ func NewSubscriber(addSubscriber func(subscriber
*Subscriber), _options ...optio
func (s *Subscriber) Subscribe(_ context.Context, subscriptionRequest
apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
result := make(chan apiModel.PlcSubscriptionRequestResult, 1)
- s.wg.Add(1)
- go func() {
- defer s.wg.Done()
+ s.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
result <-
spiModel.NewDefaultPlcSubscriptionRequestResult(subscriptionRequest, nil,
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -106,7 +104,7 @@ func (s *Subscriber) Subscribe(_ context.Context,
subscriptionRequest apiModel.P
),
nil,
)
- }()
+ })
return result
}
diff --git a/plc4go/internal/cbus/Writer.go b/plc4go/internal/cbus/Writer.go
index a83ade5de4..9e7fc790bd 100644
--- a/plc4go/internal/cbus/Writer.go
+++ b/plc4go/internal/cbus/Writer.go
@@ -60,9 +60,7 @@ func NewWriter(tpduGenerator *AlphaGenerator, messageCodec
*MessageCodec, tm tra
func (m *Writer) Write(ctx context.Context, writeRequest
apiModel.PlcWriteRequest) <-chan apiModel.PlcWriteRequestResult {
m.log.Trace().Msg("Writing")
result := make(chan apiModel.PlcWriteRequestResult, 1)
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
result <-
spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil,
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -116,7 +114,9 @@ func (m *Writer) Write(ctx context.Context, writeRequest
apiModel.PlcWriteReques
tagNameCopy := tagName
// Start a new request-transaction (Is ended in the
response-handler)
transaction := m.tm.StartTransaction()
- transaction.Submit(func(transaction
transactions.RequestTransaction) {
+ transaction.Submit(func(transactionContext
context.Context, transaction transactions.RequestTransaction) {
+ ctx, cancel := context.WithCancel(ctx)
+ context.AfterFunc(transactionContext, cancel)
// Send the over the wire
m.log.Trace().Msg("Send ")
if err := m.messageCodec.SendRequest(ctx,
messageToSend, func(receivedMessage spi.Message) bool {
@@ -159,6 +159,6 @@ func (m *Writer) Write(ctx context.Context, writeRequest
apiModel.PlcWriteReques
}
readResponse :=
spiModel.NewDefaultPlcWriteResponse(writeRequest, responseCodes)
result <-
spiModel.NewDefaultPlcWriteRequestResult(writeRequest, readResponse, nil)
- }()
+ })
return result
}
diff --git a/plc4go/internal/eip/Connection.go
b/plc4go/internal/eip/Connection.go
index 9d4d77f62a..46d67a205e 100644
--- a/plc4go/internal/eip/Connection.go
+++ b/plc4go/internal/eip/Connection.go
@@ -130,9 +130,7 @@ func (c *Connection) GetMessageCodec() spi.MessageCodec {
func (c *Connection) ConnectWithContext(ctx context.Context) <-chan
plc4go.PlcConnectionConnectResult {
c.log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
+ c.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
ch <-
_default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v.
Stack: %s", err, debug.Stack()))
@@ -155,7 +153,7 @@ func (c *Connection) ConnectWithContext(ctx
context.Context) <-chan plc4go.PlcCo
}
c.setupConnection(ctx, ch)
- }()
+ })
return ch
}
@@ -163,9 +161,7 @@ func (c *Connection) Close() <-chan
plc4go.PlcConnectionCloseResult {
// TODO: use proper context
ctx := context.TODO()
result := make(chan plc4go.PlcConnectionCloseResult, 1)
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
+ c.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
result <-
_default.NewDefaultPlcConnectionCloseResult(c, errors.Errorf("panic-ed %v.
Stack: %s", err, debug.Stack()))
@@ -193,7 +189,7 @@ func (c *Connection) Close() <-chan
plc4go.PlcConnectionCloseResult {
Uint32("sessionHandle", c.sessionHandle).
Msg("Unregistred Session %d")
result <- _default.NewDefaultPlcConnectionCloseResult(c, nil)
- }()
+ })
return result
}
diff --git a/plc4go/internal/eip/Reader.go b/plc4go/internal/eip/Reader.go
index 0e3f6784fa..b645e3be26 100644
--- a/plc4go/internal/eip/Reader.go
+++ b/plc4go/internal/eip/Reader.go
@@ -70,9 +70,7 @@ func (m *Reader) Read(ctx context.Context, readRequest
apiModel.PlcReadRequest)
// TODO: handle ctx
m.log.Trace().Msg("Reading")
result := make(chan apiModel.PlcReadRequestResult, 1)
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
result <-
spiModel.NewDefaultPlcReadRequestResult(readRequest, nil,
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -109,7 +107,9 @@ func (m *Reader) Read(ctx context.Context, readRequest
apiModel.PlcReadRequest)
typeIds,
)
transaction := m.tm.StartTransaction()
- transaction.Submit(func(transaction
transactions.RequestTransaction) {
+ transaction.Submit(func(transactionContext
context.Context, transaction transactions.RequestTransaction) {
+ ctx, cancel := context.WithCancel(ctx)
+ context.AfterFunc(transactionContext, cancel)
if err := m.messageCodec.SendRequest(
ctx,
request,
@@ -167,7 +167,7 @@ func (m *Reader) Read(ctx context.Context, readRequest
apiModel.PlcReadRequest)
}
})
}
- }()
+ })
return result
}
diff --git a/plc4go/internal/eip/Writer.go b/plc4go/internal/eip/Writer.go
index 0c7ef52a16..5bc832ec64 100644
--- a/plc4go/internal/eip/Writer.go
+++ b/plc4go/internal/eip/Writer.go
@@ -66,9 +66,7 @@ func NewWriter(messageCodec spi.MessageCodec, tm
transactions.RequestTransaction
func (m *Writer) Write(ctx context.Context, writeRequest
apiModel.PlcWriteRequest) <-chan apiModel.PlcWriteRequestResult {
// TODO: handle context
result := make(chan apiModel.PlcWriteRequestResult, 1)
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
result <-
spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil,
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -283,7 +281,7 @@ func (m *Writer) Write(ctx context.Context, writeRequest
apiModel.PlcWriteReques
}
})
}*/
- }()
+ })
return result
}
diff --git a/plc4go/internal/knxnetip/Connection.go
b/plc4go/internal/knxnetip/Connection.go
index ba1d29104c..1dc7ffaa2e 100644
--- a/plc4go/internal/knxnetip/Connection.go
+++ b/plc4go/internal/knxnetip/Connection.go
@@ -240,9 +240,7 @@ func (m *Connection) ConnectWithContext(ctx
context.Context) <-chan plc4go.PlcCo
result <-
_default.NewDefaultPlcConnectionConnectResult(connection, err)
}
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
result <-
_default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v.
Stack: %s", err, debug.Stack()))
@@ -314,9 +312,7 @@ func (m *Connection) ConnectWithContext(ctx
context.Context) <-chan plc4go.PlcCo
// handled by any other handler. This is where
usually the GroupValueWrite messages
// are being handled.
m.log.Debug().Msg("Starting tunneling handler")
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil
{
m.log.Error().
@@ -374,7 +370,7 @@ func (m *Connection) ConnectWithContext(ctx
context.Context) <-chan plc4go.PlcCo
}
}
m.log.Warn().Msg("Tunneling handler
shat down")
- }()
+ })
// Fire the "connected" event
sendResult(m, nil)
@@ -386,7 +382,7 @@ func (m *Connection) ConnectWithContext(ctx
context.Context) <-chan plc4go.PlcCo
} else {
m.doSomethingAndClose(func() { sendResult(nil,
errors.New("this device doesn't support tunneling")) })
}
- }()
+ })
return result
}
@@ -419,9 +415,7 @@ func (m *Connection) Close() <-chan
plc4go.PlcConnectionCloseResult {
ctx := context.TODO()
result := make(chan plc4go.PlcConnectionCloseResult, 1)
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
result <-
_default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v.
Stack: %s", err, debug.Stack()))
@@ -455,7 +449,7 @@ func (m *Connection) Close() <-chan
plc4go.PlcConnectionCloseResult {
} else {
result <-
_default.NewDefaultPlcConnectionCloseResult(m, nil)
}
- }()
+ })
return result
}
@@ -484,9 +478,7 @@ func (m *Connection) Ping() <-chan
plc4go.PlcConnectionPingResult {
ctx := context.TODO()
result := make(chan plc4go.PlcConnectionPingResult, 1)
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
result <-
_default.NewDefaultPlcConnectionPingResult(errors.Errorf("panic-ed %v. Stack:
%s", err, debug.Stack()))
@@ -500,7 +492,7 @@ func (m *Connection) Ping() <-chan
plc4go.PlcConnectionPingResult {
result <-
_default.NewDefaultPlcConnectionPingResult(nil)
}
return
- }()
+ })
return result
}
diff --git a/plc4go/internal/knxnetip/ConnectionDriverSpecificOperations.go
b/plc4go/internal/knxnetip/ConnectionDriverSpecificOperations.go
index 6e56ff0a8c..c8c7baa048 100644
--- a/plc4go/internal/knxnetip/ConnectionDriverSpecificOperations.go
+++ b/plc4go/internal/knxnetip/ConnectionDriverSpecificOperations.go
@@ -65,9 +65,7 @@ func (m *Connection) ReadGroupAddress(ctx context.Context,
groupAddress []byte,
}
}
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
m.log.Error().
@@ -107,7 +105,7 @@ func (m *Connection) ReadGroupAddress(ctx context.Context,
groupAddress []byte,
// Return the value
sendResponse(plcValue, 1, nil)
- }()
+ })
return result
}
@@ -130,9 +128,7 @@ func (m *Connection) DeviceConnect(ctx context.Context,
targetAddress driverMode
}
}
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
m.log.Error().
@@ -208,7 +204,7 @@ func (m *Connection) DeviceConnect(ctx context.Context,
targetAddress driverMode
connection.maxApdu = uint16(math.Min(float64(deviceApduSize),
240))
sendResponse(connection, nil)
- }()
+ })
return result
}
@@ -231,9 +227,7 @@ func (m *Connection) DeviceDisconnect(ctx context.Context,
targetAddress driverM
}
}
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
m.log.Error().
@@ -252,7 +246,7 @@ func (m *Connection) DeviceDisconnect(ctx context.Context,
targetAddress driverM
} else {
sendResponse(connection, nil)
}
- }()
+ })
return result
}
@@ -274,9 +268,7 @@ func (m *Connection) DeviceAuthenticate(ctx
context.Context, targetAddress drive
}
}
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
m.log.Error().
@@ -315,7 +307,7 @@ func (m *Connection) DeviceAuthenticate(ctx
context.Context, targetAddress drive
} else {
sendResponse(errors.Errorf("got error authenticating at
device %s", KnxAddressToString(targetAddress)))
}
- }()
+ })
return result
}
@@ -339,9 +331,7 @@ func (m *Connection) DeviceReadProperty(ctx
context.Context, targetAddress drive
}
}
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
m.log.Error().
@@ -405,7 +395,7 @@ func (m *Connection) DeviceReadProperty(ctx
context.Context, targetAddress drive
} else {
sendResponse(plcValue, 1, err)
}
- }()
+ })
return result
}
@@ -429,9 +419,7 @@ func (m *Connection) DeviceReadPropertyDescriptor(ctx
context.Context, targetAdd
}
}
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
m.log.Error().
@@ -475,7 +463,7 @@ func (m *Connection) DeviceReadPropertyDescriptor(ctx
context.Context, targetAdd
val["writeLevel"] =
spiValues.NewPlcSTRING(propertyDescriptionResponse.GetWriteLevel().String())
str := spiValues.NewPlcStruct(val)
sendResponse(&str, 1, nil)
- }()
+ })
return result
}
@@ -499,9 +487,7 @@ func (m *Connection) DeviceReadMemory(ctx context.Context,
targetAddress driverM
}
}
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
m.log.Error().
@@ -588,7 +574,7 @@ func (m *Connection) DeviceReadMemory(ctx context.Context,
targetAddress driverM
} else if len(results) == 1 {
sendResponse(results[0], 1, nil)
}
- }()
+ })
return result
}
diff --git a/plc4go/internal/knxnetip/ConnectionHelper.go
b/plc4go/internal/knxnetip/ConnectionHelper.go
index feadf9bd67..1a4b7d55af 100644
--- a/plc4go/internal/knxnetip/ConnectionHelper.go
+++ b/plc4go/internal/knxnetip/ConnectionHelper.go
@@ -54,9 +54,7 @@ func (m *Connection) castIpToKnxAddress(ip net.IP)
driverModel.IPAddress {
}
func (m *Connection) handleIncomingTunnelingRequest(ctx context.Context,
tunnelingRequest driverModel.TunnelingRequest) {
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
m.log.Error().
@@ -115,7 +113,7 @@ func (m *Connection) handleIncomingTunnelingRequest(ctx
context.Context, tunneli
default:
m.log.Info().Msg("Unknown unhandled message.")
}
- }()
+ })
}
func (m *Connection) handleValueCacheUpdate(ctx context.Context,
destinationAddress []byte, payload []byte) {
@@ -142,9 +140,7 @@ func (m *Connection) handleTimeout() {
// If this is the first timeout in a sequence, start the timer.
/* if m.connectionTimeoutTimer == nil {
m.connectionTimeoutTimer = time.NewTimer(m.connectionTtl)
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
<-m.connectionTimeoutTimer.C
m.resetConnection()
}()
diff --git a/plc4go/internal/knxnetip/Discoverer.go
b/plc4go/internal/knxnetip/Discoverer.go
index a6a98a7603..67568a4852 100644
--- a/plc4go/internal/knxnetip/Discoverer.go
+++ b/plc4go/internal/knxnetip/Discoverer.go
@@ -119,8 +119,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback
func(event apiModel.
if err != nil {
return err
}
- wg.Add(1)
- go func(netInterface net.Interface) {
+ wg.Go(func() {
defer func() {
if err := recover(); err != nil {
d.log.Error().
@@ -129,7 +128,6 @@ func (d *Discoverer) Discover(ctx context.Context, callback
func(event apiModel.
Msg("panic-ed")
}
}()
- defer func() { wg.Done() }()
// Iterate over all addresses the current interface has
configured
// For KNX we're only interested in IPv4 addresses, as
it doesn't
// seem to work with IPv6.
@@ -157,19 +155,15 @@ func (d *Discoverer) Discover(ctx context.Context,
callback func(event apiModel.
}
d.transportInstanceCreationQueue.Submit(ctx,
d.transportInstanceCreationWorkItemId.Add(1),
d.createTransportInstanceDispatcher(ctx, wg, connectionUrl, ipv4Addr,
udpTransport, transportInstances))
}
- }(netInterface)
+ })
}
- d.wg.Add(1)
- go func() {
- defer d.wg.Done()
+ d.wg.Go(func() {
wg.Wait()
d.log.Trace().Msg("Closing transport instance channel")
close(transportInstances)
- }()
+ })
- d.wg.Add(1)
- go func() {
- defer d.wg.Done()
+ d.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
d.log.Error().
@@ -185,13 +179,15 @@ func (d *Discoverer) Discover(ctx context.Context,
callback func(event apiModel.
}
d.deviceScanningQueue.Submit(ctx,
d.deviceScanningWorkItemId.Add(1), d.createDeviceScanDispatcher(ctx,
transportInstance.(*udp.TransportInstance), callback))
}
- }()
+ })
return nil
}
func (d *Discoverer) createTransportInstanceDispatcher(ctx context.Context, wg
*sync.WaitGroup, connectionUrl *url.URL, ipv4Addr net.IP, udpTransport
*udp.Transport, transportInstances chan transports.TransportInstance)
pool.Runnable {
wg.Add(1)
- return func() {
+ return func(workerCtx context.Context) {
+ ctx, cancel := context.WithCancel(ctx)
+ context.AfterFunc(workerCtx, cancel)
defer wg.Done()
// Create a new "connection" (Actually open a local udp socket
and target outgoing packets to that address)
transportInstance, err :=
@@ -212,7 +208,9 @@ func (d *Discoverer) createTransportInstanceDispatcher(ctx
context.Context, wg *
}
func (d *Discoverer) createDeviceScanDispatcher(ctx context.Context,
udpTransportInstance *udp.TransportInstance, callback func(event
apiModel.PlcDiscoveryItem)) pool.Runnable {
- return func() {
+ return func(workerCtx context.Context) {
+ ctx, cancel := context.WithCancel(ctx)
+ context.AfterFunc(workerCtx, cancel)
d.log.Debug().Stringer("udpTransportInstance",
udpTransportInstance).Msg("Scanning")
// Create a codec for sending and receiving messages.
codec := NewMessageCodec(
diff --git a/plc4go/internal/knxnetip/Reader.go
b/plc4go/internal/knxnetip/Reader.go
index 6a22617701..cbc2634694 100644
--- a/plc4go/internal/knxnetip/Reader.go
+++ b/plc4go/internal/knxnetip/Reader.go
@@ -58,9 +58,7 @@ func NewReader(connection *Connection, _options
...options.WithOption) *Reader {
func (m *Reader) Read(ctx context.Context, readRequest
apiModel.PlcReadRequest) <-chan apiModel.PlcReadRequestResult {
// TODO: handle ctx
resultChan := make(chan apiModel.PlcReadRequestResult, 1)
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
resultChan <-
spiModel.NewDefaultPlcReadRequestResult(readRequest, nil,
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -179,7 +177,7 @@ func (m *Reader) Read(ctx context.Context, readRequest
apiModel.PlcReadRequest)
result,
nil,
)
- }()
+ })
return resultChan
}
diff --git a/plc4go/internal/knxnetip/Subscriber.go
b/plc4go/internal/knxnetip/Subscriber.go
index 1ab0675939..19f3b64f96 100644
--- a/plc4go/internal/knxnetip/Subscriber.go
+++ b/plc4go/internal/knxnetip/Subscriber.go
@@ -64,9 +64,7 @@ func NewSubscriber(connection *Connection, _options
...options.WithOption) *Subs
func (s *Subscriber) Subscribe(ctx context.Context, subscriptionRequest
apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
// TODO: handle context
result := make(chan apiModel.PlcSubscriptionRequestResult, 1)
- s.wg.Add(1)
- go func() {
- defer s.wg.Done()
+ s.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
result <-
spiModel.NewDefaultPlcSubscriptionRequestResult(subscriptionRequest, nil,
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -96,7 +94,7 @@ func (s *Subscriber) Subscribe(ctx context.Context,
subscriptionRequest apiModel
),
nil,
)
- }()
+ })
return result
}
diff --git a/plc4go/internal/modbus/AsciiDriver.go
b/plc4go/internal/modbus/AsciiDriver.go
index 068a4d307d..55eec7524c 100644
--- a/plc4go/internal/modbus/AsciiDriver.go
+++ b/plc4go/internal/modbus/AsciiDriver.go
@@ -93,9 +93,7 @@ func (d *AsciiDriver) GetConnectionWithContext(ctx
context.Context, transportUrl
// Create a new codec for taking care of encoding/decoding of messages
// TODO: the code below looks strange: where is defaultChanel being
used?
defaultChanel := make(chan any)
- d.wg.Add(1)
- go func() {
- defer d.wg.Done()
+ d.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
d.log.Error().
@@ -109,7 +107,7 @@ func (d *AsciiDriver) GetConnectionWithContext(ctx
context.Context, transportUrl
adu := msg.(model.ModbusTcpADU)
d.log.Debug().Stringer("adu", adu).Msg("got message in
the default handler %s\n")
}
- }()
+ })
codec := NewMessageCodec(
transportInstance,
append(d._options, options.WithCustomLogger(d.log))...,
diff --git a/plc4go/internal/modbus/Connection.go
b/plc4go/internal/modbus/Connection.go
index 4d73f9b43b..018956e5a4 100644
--- a/plc4go/internal/modbus/Connection.go
+++ b/plc4go/internal/modbus/Connection.go
@@ -110,9 +110,7 @@ func (c *Connection) Ping() <-chan
plc4go.PlcConnectionPingResult {
ctx := context.TODO()
c.log.Trace().Msg("Pinging")
result := make(chan plc4go.PlcConnectionPingResult, 1)
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
+ c.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
result <-
_default.NewDefaultPlcConnectionPingResult(errors.Errorf("panic-ed %v. Stack:
%s", err, debug.Stack()))
@@ -149,7 +147,7 @@ func (c *Connection) Ping() <-chan
plc4go.PlcConnectionPingResult {
); err != nil {
result <-
_default.NewDefaultPlcConnectionPingResult(err)
}
- }()
+ })
return result
}
diff --git a/plc4go/internal/modbus/Reader.go b/plc4go/internal/modbus/Reader.go
index 7609baec06..0f9cb72ae5 100644
--- a/plc4go/internal/modbus/Reader.go
+++ b/plc4go/internal/modbus/Reader.go
@@ -65,9 +65,7 @@ func (m *Reader) Read(ctx context.Context, readRequest
apiModel.PlcReadRequest)
// TODO: handle ctx
m.log.Trace().Msg("Reading")
result := make(chan apiModel.PlcReadRequestResult, 1)
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
result <-
spiModel.NewDefaultPlcReadRequestResult(readRequest, nil,
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -175,7 +173,7 @@ func (m *Reader) Read(ctx context.Context, readRequest
apiModel.PlcReadRequest)
errors.Wrap(err, "error sending message"),
)
}
- }()
+ })
return result
}
diff --git a/plc4go/internal/modbus/RtuDriver.go
b/plc4go/internal/modbus/RtuDriver.go
index 19b7cdc617..2a41b3113f 100644
--- a/plc4go/internal/modbus/RtuDriver.go
+++ b/plc4go/internal/modbus/RtuDriver.go
@@ -93,9 +93,7 @@ func (d *RtuDriver) GetConnectionWithContext(ctx
context.Context, transportUrl u
// Create a new codec for taking care of encoding/decoding of messages
// TODO: the code below looks strange: where is defaultChanel being
used?
defaultChanel := make(chan any)
- d.wg.Add(1)
- go func() {
- defer d.wg.Done()
+ d.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
d.log.Error().
@@ -109,7 +107,7 @@ func (d *RtuDriver) GetConnectionWithContext(ctx
context.Context, transportUrl u
adu := msg.(model.ModbusTcpADU)
d.log.Debug().Stringer("adu", adu).Msg("got message in
the default handler")
}
- }()
+ })
codec := NewMessageCodec(
transportInstance,
append(d._options, options.WithCustomLogger(d.log))...,
diff --git a/plc4go/internal/modbus/TcpDriver.go
b/plc4go/internal/modbus/TcpDriver.go
index 47c9ff3b0d..498e852d21 100644
--- a/plc4go/internal/modbus/TcpDriver.go
+++ b/plc4go/internal/modbus/TcpDriver.go
@@ -93,9 +93,7 @@ func (d *TcpDriver) GetConnectionWithContext(ctx
context.Context, transportUrl u
// Create a new codec for taking care of encoding/decoding of messages
// TODO: the code below looks strange: where is defaultChanel being
used?
defaultChanel := make(chan any)
- d.wg.Add(1)
- go func() {
- defer d.wg.Done()
+ d.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
d.log.Error().
@@ -109,7 +107,7 @@ func (d *TcpDriver) GetConnectionWithContext(ctx
context.Context, transportUrl u
adu := msg.(model.ModbusTcpADU)
d.log.Debug().Stringer("adu", adu).Msg("got message in
the default handler")
}
- }()
+ })
codec := NewMessageCodec(
transportInstance,
append(d._options, options.WithCustomLogger(d.log))...,
diff --git a/plc4go/internal/modbus/Writer.go b/plc4go/internal/modbus/Writer.go
index 9ad3aa3b17..6309b7a97b 100644
--- a/plc4go/internal/modbus/Writer.go
+++ b/plc4go/internal/modbus/Writer.go
@@ -59,9 +59,7 @@ func NewWriter(unitIdentifier uint8, messageCodec
spi.MessageCodec, _options ...
func (m *Writer) Write(ctx context.Context, writeRequest
apiModel.PlcWriteRequest) <-chan apiModel.PlcWriteRequestResult {
// TODO: handle context
result := make(chan apiModel.PlcWriteRequestResult, 1)
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
// If we are requesting only one tag, use a
if len(writeRequest.GetTagNames()) != 1 {
result <-
spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, errors.New("modbus
only supports single-item requests"))
@@ -152,7 +150,7 @@ func (m *Writer) Write(ctx context.Context, writeRequest
apiModel.PlcWriteReques
}
return nil
}, time.Second*1)
- }()
+ })
return result
}
diff --git a/plc4go/internal/opcua/Connection.go
b/plc4go/internal/opcua/Connection.go
index 503e7a82c4..de7729b7cb 100644
--- a/plc4go/internal/opcua/Connection.go
+++ b/plc4go/internal/opcua/Connection.go
@@ -113,9 +113,7 @@ func (c *Connection) GetMessageCodec() spi.MessageCodec {
func (c *Connection) ConnectWithContext(ctx context.Context) <-chan
plc4go.PlcConnectionConnectResult {
c.log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
+ c.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
c.fireConnectionError(errors.Errorf("panic-ed
%v. Stack:\n%s", err, debug.Stack()), ch)
@@ -146,15 +144,13 @@ func (c *Connection) ConnectWithContext(ctx
context.Context) <-chan plc4go.PlcCo
}
c.setupConnection(ctx, ch)
- }()
+ })
return ch
}
func (c *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
results := make(chan plc4go.PlcConnectionCloseResult, 1)
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
+ c.wg.Go(func() {
result := <-c.DefaultConnection.Close()
c.channel.onDisconnect(context.Background(), c)
disconnectTimeout := time.NewTimer(c.disconnectTimeout)
@@ -165,7 +161,7 @@ func (c *Connection) Close() <-chan
plc4go.PlcConnectionCloseResult {
case <-disconnectTimeout.C:
results <-
_default.NewDefaultPlcConnectionCloseResult(c, errors.Errorf("timeout after
%s", c.disconnectTimeout))
}
- }()
+ })
return results
}
diff --git a/plc4go/internal/opcua/SecureChannel.go
b/plc4go/internal/opcua/SecureChannel.go
index c24db8979a..7a7afc83ba 100644
--- a/plc4go/internal/opcua/SecureChannel.go
+++ b/plc4go/internal/opcua/SecureChannel.go
@@ -1273,11 +1273,7 @@ func (s *SecureChannel) keepAlive() {
s.log.Warn().Msg("keepalive already running")
return
}
- s.keepAliveWg.Add(1)
- s.wg.Add(1)
- go func() {
- defer s.wg.Done()
- defer s.keepAliveWg.Done()
+ s.keepAliveWg.Go(func() {
s.keepAliveIndicator.Store(true)
defer s.keepAliveIndicator.Store(false)
defer s.log.Info().Msg("ending keepalive")
@@ -1424,7 +1420,7 @@ func (s *SecureChannel) keepAlive() {
s.log.Debug().Err(err).Msg("error submitting")
}
}
- }()
+ })
return
}
diff --git a/plc4go/internal/opcua/SubscriptionHandle.go
b/plc4go/internal/opcua/SubscriptionHandle.go
index 6fea5c017b..24cb269a80 100644
--- a/plc4go/internal/opcua/SubscriptionHandle.go
+++ b/plc4go/internal/opcua/SubscriptionHandle.go
@@ -219,11 +219,7 @@ func (h *SubscriptionHandle)
onSubscribeCreateMonitoredItemsRequest() (readWrite
func (h *SubscriptionHandle) startSubscriber() {
h.log.Trace().Msg("Starting Subscription")
- h.subscriberWg.Add(1)
- h.wg.Add(1)
- go func() {
- defer h.wg.Done()
- defer h.subscriberWg.Done()
+ h.subscriberWg.Go(func() {
var outstandingAcknowledgements
[]readWriteModel.SubscriptionAcknowledgement
var outstandingRequests []uint32
@@ -353,7 +349,7 @@ func (h *SubscriptionHandle) startSubscriber() {
//Wait for any outstanding responses to arrive, using the
request timeout length
//sleep(this.revisedCycleTime * 10);
h.complete = true
- }()
+ })
}
// stopSubscriber stops the subscriber either on disconnect or on error
diff --git a/plc4go/internal/s7/Connection.go b/plc4go/internal/s7/Connection.go
index 7e1157a778..ad7a21c562 100644
--- a/plc4go/internal/s7/Connection.go
+++ b/plc4go/internal/s7/Connection.go
@@ -121,9 +121,7 @@ func (c *Connection) GetMessageCodec() spi.MessageCodec {
func (c *Connection) ConnectWithContext(ctx context.Context) <-chan
plc4go.PlcConnectionConnectResult {
c.log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
+ c.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
ch <-
_default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v.
Stack: %s", err, debug.Stack()))
@@ -156,7 +154,7 @@ func (c *Connection) ConnectWithContext(ctx
context.Context) <-chan plc4go.PlcCo
c.log.Info().Msg("S7 Driver running in ACTIVE mode.")
c.setupConnection(ctx, ch)
- }()
+ })
return ch
}
diff --git a/plc4go/internal/s7/Reader.go b/plc4go/internal/s7/Reader.go
index d734353dd2..70a3603417 100644
--- a/plc4go/internal/s7/Reader.go
+++ b/plc4go/internal/s7/Reader.go
@@ -65,9 +65,7 @@ func (m *Reader) Read(ctx context.Context, readRequest
apiModel.PlcReadRequest)
// TODO: handle ctx
m.log.Trace().Msg("Reading")
result := make(chan apiModel.PlcReadRequestResult, 1)
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
result <-
spiModel.NewDefaultPlcReadRequestResult(readRequest, nil,
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -116,7 +114,9 @@ func (m *Reader) Read(ctx context.Context, readRequest
apiModel.PlcReadRequest)
)
// Start a new request-transaction (Is ended in the
response-handler)
transaction := m.tm.StartTransaction()
- transaction.Submit(func(transaction
transactions.RequestTransaction) {
+ transaction.Submit(func(transactionContext context.Context,
transaction transactions.RequestTransaction) {
+ ctx, cancel := context.WithCancel(ctx)
+ context.AfterFunc(transactionContext, cancel)
// Send the over the wire
m.log.Trace().Msg("Send ")
@@ -176,7 +176,7 @@ func (m *Reader) Read(ctx context.Context, readRequest
apiModel.PlcReadRequest)
}
}
})
- }()
+ })
return result
}
diff --git a/plc4go/internal/s7/Writer.go b/plc4go/internal/s7/Writer.go
index 667fd6cab9..6c3e306ed2 100644
--- a/plc4go/internal/s7/Writer.go
+++ b/plc4go/internal/s7/Writer.go
@@ -60,9 +60,7 @@ func NewWriter(tpduGenerator *TpduGenerator, messageCodec
spi.MessageCodec, tm t
func (m *Writer) Write(ctx context.Context, writeRequest
apiModel.PlcWriteRequest) <-chan apiModel.PlcWriteRequestResult {
// TODO: handle context
result := make(chan apiModel.PlcWriteRequestResult, 1)
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
result <-
spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil,
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -109,7 +107,9 @@ func (m *Writer) Write(ctx context.Context, writeRequest
apiModel.PlcWriteReques
// Start a new request-transaction (Is ended in the
response-handler)
transaction := m.tm.StartTransaction()
- transaction.Submit(func(transaction
transactions.RequestTransaction) {
+ transaction.Submit(func(transactionContext context.Context,
transaction transactions.RequestTransaction) {
+ ctx, cancel := context.WithCancel(ctx)
+ context.AfterFunc(transactionContext, cancel)
// Send the over the wire
if err := m.messageCodec.SendRequest(ctx, tpktPacket,
func(message spi.Message) bool {
tpktPacket, ok :=
message.(readWriteModel.TPKTPacket)
@@ -160,7 +160,7 @@ func (m *Writer) Write(ctx context.Context, writeRequest
apiModel.PlcWriteReques
}
}
})
- }()
+ })
return result
}
diff --git a/plc4go/internal/simulated/Connection.go
b/plc4go/internal/simulated/Connection.go
index 8d1b558835..5915360b86 100644
--- a/plc4go/internal/simulated/Connection.go
+++ b/plc4go/internal/simulated/Connection.go
@@ -91,9 +91,7 @@ func (c *Connection) Connect() <-chan
plc4go.PlcConnectionConnectResult {
func (c *Connection) ConnectWithContext(_ context.Context) <-chan
plc4go.PlcConnectionConnectResult {
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
+ c.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
ch <-
_default.NewDefaultPlcConnectionCloseResult(nil, errors.Errorf("panic-ed %v.
Stack: %s", err, debug.Stack()))
@@ -139,7 +137,7 @@ func (c *Connection) ConnectWithContext(_ context.Context)
<-chan plc4go.PlcConn
// Return the connection in a connected state to the
user.
ch <- _default.NewDefaultPlcConnectionConnectResult(c,
nil)
}
- }()
+ })
return ch
}
@@ -149,9 +147,7 @@ func (c *Connection) BlockingClose() {
func (c *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
ch := make(chan plc4go.PlcConnectionCloseResult, 1)
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
+ c.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
ch <-
_default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v.
Stack: %s", err, debug.Stack()))
@@ -187,7 +183,7 @@ func (c *Connection) Close() <-chan
plc4go.PlcConnectionCloseResult {
}
// Return a new connection to the user.
ch <- _default.NewDefaultPlcConnectionCloseResult(c, nil)
- }()
+ })
return ch
}
@@ -197,9 +193,7 @@ func (c *Connection) IsConnected() bool {
func (c *Connection) Ping() <-chan plc4go.PlcConnectionPingResult {
ch := make(chan plc4go.PlcConnectionPingResult, 1)
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
+ c.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
ch <-
_default.NewDefaultPlcConnectionPingResult(errors.Errorf("panic-ed %v. Stack:
%s", err, debug.Stack()))
@@ -242,7 +236,7 @@ func (c *Connection) Ping() <-chan
plc4go.PlcConnectionPingResult {
}
ch <- _default.NewDefaultPlcConnectionPingResult(nil)
}
- }()
+ })
return ch
}
diff --git a/plc4go/internal/simulated/Connection_test.go
b/plc4go/internal/simulated/Connection_test.go
index 9b26161ef1..41b42dff52 100644
--- a/plc4go/internal/simulated/Connection_test.go
+++ b/plc4go/internal/simulated/Connection_test.go
@@ -20,6 +20,7 @@
package simulated
import (
+ "sync"
"testing"
"time"
@@ -328,10 +329,12 @@ func TestConnection_BlockingClose(t *testing.T) {
timeBeforeClose := time.Now()
executor := func() <-chan bool {
ch := make(chan bool)
- go func() {
+ var wg sync.WaitGroup
+ t.Cleanup(wg.Wait)
+ wg.Go(func() {
c.BlockingClose()
ch <- true
- }()
+ })
return ch
}
timeout := time.NewTimer(3 * time.Second)
diff --git a/plc4go/internal/simulated/Reader.go
b/plc4go/internal/simulated/Reader.go
index 4650a6f911..d44815fc03 100644
--- a/plc4go/internal/simulated/Reader.go
+++ b/plc4go/internal/simulated/Reader.go
@@ -59,9 +59,7 @@ func NewReader(device *Device, readerOptions
map[string][]string, tracer tracer.
func (r *Reader) Read(_ context.Context, readRequest apiModel.PlcReadRequest)
<-chan apiModel.PlcReadRequestResult {
ch := make(chan apiModel.PlcReadRequestResult, 1)
- r.wg.Add(1)
- go func() {
- defer r.wg.Done()
+ r.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
ch <-
spiModel.NewDefaultPlcReadRequestResult(readRequest, nil,
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -111,6 +109,6 @@ func (r *Reader) Read(_ context.Context, readRequest
apiModel.PlcReadRequest) <-
spiModel.NewDefaultPlcReadResponse(readRequest,
responseCodes, responseValues),
nil,
)
- }()
+ })
return ch
}
diff --git a/plc4go/internal/simulated/Writer.go
b/plc4go/internal/simulated/Writer.go
index e09e8bca17..4af1a3fe08 100644
--- a/plc4go/internal/simulated/Writer.go
+++ b/plc4go/internal/simulated/Writer.go
@@ -58,9 +58,7 @@ func NewWriter(device *Device, writerOptions
map[string][]string, tracer tracer.
func (w *Writer) Write(_ context.Context, writeRequest
apiModel.PlcWriteRequest) <-chan apiModel.PlcWriteRequestResult {
ch := make(chan apiModel.PlcWriteRequestResult, 1)
- w.wg.Add(1)
- go func() {
- defer w.wg.Done()
+ w.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
ch <-
spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil,
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -99,6 +97,6 @@ func (w *Writer) Write(_ context.Context, writeRequest
apiModel.PlcWriteRequest)
}
// Emit the response
ch <- spiModel.NewDefaultPlcWriteRequestResult(writeRequest,
spiModel.NewDefaultPlcWriteResponse(writeRequest, responseCodes), nil)
- }()
+ })
return ch
}
diff --git a/plc4go/pkg/api/cache/PlcConnectionCache.go
b/plc4go/pkg/api/cache/PlcConnectionCache.go
index 6872891bef..fc1c0ba616 100644
--- a/plc4go/pkg/api/cache/PlcConnectionCache.go
+++ b/plc4go/pkg/api/cache/PlcConnectionCache.go
@@ -162,9 +162,7 @@ func (c *plcConnectionCache) GetConnection(connectionString
string) <-chan plc4g
func (c *plcConnectionCache) GetConnectionWithContext(ctx context.Context,
connectionString string) <-chan plc4go.PlcConnectionConnectResult {
ch := make(chan plc4go.PlcConnectionConnectResult)
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
+ c.wg.Go(func() {
c.cacheLock.Lock()
// If a connection for this connection string didn't exist yet,
create a new container
@@ -181,9 +179,9 @@ func (c *plcConnectionCache) GetConnectionWithContext(ctx
context.Context, conne
// Store the new connection container in the cache of
connections.
c.connections[connectionString] = cc
// Initialize the connection itself.
- go func(cc2 *connectionContainer) {
- cc2.connect(ctx)
- }(cc)
+ c.wg.Go(func() {
+ cc.connect(ctx)
+ })
}
// Get the ConnectionContainer for this connection string.
@@ -229,19 +227,17 @@ func (c *plcConnectionCache) GetConnectionWithContext(ctx
context.Context, conne
case <-maximumWaitTimeout.C: // Timeout after the maximum
waiting time.
// In this case we need to drain the chan and return it
immediate
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
+ c.wg.Go(func() {
<-leaseChan
_ = connection.returnConnection(ctx, StateIdle)
- }()
+ })
if c.tracer != nil {
c.tracer.AddTransactionalTrace(txId,
"get-connection", "timeout")
}
c.log.Debug().Str("connectionString",
connectionString).Msg("Timeout while waiting for connection.")
ch <- _default.NewDefaultPlcConnectionCloseResult(nil,
errors.New("timeout while waiting for connection"))
}
- }()
+ })
return ch
}
@@ -250,9 +246,7 @@ func (c *plcConnectionCache) Close() <-chan
PlcConnectionCacheCloseResult {
c.log.Debug().Msg("Closing connection cache started.")
ch := make(chan PlcConnectionCacheCloseResult)
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
+ c.wg.Go(func() {
c.log.Trace().Msg("Acquire lock")
c.cacheLock.Lock()
defer c.cacheLock.Unlock()
@@ -268,16 +262,16 @@ func (c *plcConnectionCache) Close() <-chan
PlcConnectionCacheCloseResult {
return
}
- for _, cc := range c.connections {
- ccLog := c.log.With().Stringer("cc", cc).Logger()
+ for _, connectionContainer := range c.connections {
+ ccLog := c.log.With().Stringer("connectionContainer",
connectionContainer).Logger()
ccLog.Trace().Msg("Closing connection")
// Mark the connection as being closed to not try to
re-establish it.
- cc.closed = true
+ connectionContainer.closed = true
// Try to get a lease as this way we kow we're not
closing the connection
// while some go func is still using it.
- go func(container *connectionContainer) {
+ c.wg.Go(func() {
ccLog.Trace().Msg("getting a lease")
- leaseResults := container.lease()
+ leaseResults := connectionContainer.lease()
closeTimeout := time.NewTimer(c.maxWaitTime)
select {
// We're just getting the lease as this way we
can be sure nobody else is using it.
@@ -286,16 +280,16 @@ func (c *plcConnectionCache) Close() <-chan
PlcConnectionCacheCloseResult {
case _ = <-leaseResults:
ccLog.Debug().Msg("Gracefully closing
connection ...")
// Give back the connection.
- if container.connection != nil {
+ if connectionContainer.connection !=
nil {
ccLog.Trace().Msg("closing
actual connection")
- container.connection.Close()
+
connectionContainer.connection.Close()
}
// If we're timing out brutally kill the
connection.
case <-closeTimeout.C:
ccLog.Debug().Msg("Forcefully closing
connection ...")
// Forcefully close this connection.
- if container.connection != nil {
- container.connection.Close()
+ if connectionContainer.connection !=
nil {
+
connectionContainer.connection.Close()
}
}
@@ -306,9 +300,9 @@ func (c *plcConnectionCache) Close() <-chan
PlcConnectionCacheCloseResult {
case <-responseDeliveryTimeout.C:
}
c.log.Debug().Msg("Closing connection cache
finished.")
- }(cc)
+ })
}
- }()
+ })
return ch
}
diff --git a/plc4go/pkg/api/cache/PlcConnectionCache_test.go
b/plc4go/pkg/api/cache/PlcConnectionCache_test.go
index d0f7ac1ff9..0013453a2e 100644
--- a/plc4go/pkg/api/cache/PlcConnectionCache_test.go
+++ b/plc4go/pkg/api/cache/PlcConnectionCache_test.go
@@ -20,6 +20,7 @@
package cache
import (
+ "sync"
"testing"
"time"
@@ -230,11 +231,9 @@ func (c *plcConnectionCache) readFromPlc(t *testing.T,
preConnectJob func(), con
closeResults := connection.Close()
// Wait for the connection to be correctly closed.
closeResult := <-closeResults
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
+ c.wg.Go(func() {
ch <-
(closeResult.(_default.DefaultPlcConnectionCloseResult)).GetTraces()
- }()
+ })
}()
// Prepare a read request.
@@ -264,9 +263,7 @@ func (c *plcConnectionCache) readFromPlc(t *testing.T,
preConnectJob func(), con
func (c *plcConnectionCache) executeAndTestReadFromPlc(t *testing.T,
preConnectJob func(), connectionString string, resourceString string,
expectedTraceEntries []string, expectedNumTotalConnections int) <-chan struct{}
{
ch := make(chan struct{})
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
+ c.wg.Go(func() {
// Read once from the c.
traces := <-c.readFromPlc(t, preConnectJob, connectionString,
resourceString)
@@ -287,7 +284,7 @@ func (c *plcConnectionCache) executeAndTestReadFromPlc(t
*testing.T, preConnectJ
t.Errorf("Expected %d connections in the c but got %d",
expectedNumTotalConnections, len(c.connections))
}
ch <- struct{}{}
- }()
+ })
return ch
}
@@ -875,8 +872,10 @@ func TestPlcConnectionCache_MaximumWaitTimeReached(t
*testing.T) {
// The third connection should be given up by the cache
thirdConnectionResults :=
cache.GetConnection("simulated://1.2.3.4:42?connectionDelay=100&pingDelay=4000&traceEnabled=true")
+ var wg sync.WaitGroup
+ t.Cleanup(wg.Wait)
// Just make sure the first two connections are returned as soon as
they are received
- go func() {
+ wg.Go(func() {
select {
case connectionResult := <-firstConnectionResults:
if assert.NotNil(t, connectionResult) {
@@ -888,8 +887,8 @@ func TestPlcConnectionCache_MaximumWaitTimeReached(t
*testing.T) {
case <-time.After(5 * time.Second):
t.Errorf("Timeout")
}
- }()
- go func() {
+ })
+ wg.Go(func() {
select {
case connectionResult := <-secondConnectionResults:
if assert.NotNil(t, connectionResult) {
@@ -901,7 +900,7 @@ func TestPlcConnectionCache_MaximumWaitTimeReached(t
*testing.T) {
case <-time.After(5 * time.Second):
t.Errorf("Timeout")
}
- }()
+ })
// Now wait for the last connection to be timed out by the cache
select {
diff --git a/plc4go/pkg/api/cache/connectionContainer.go
b/plc4go/pkg/api/cache/connectionContainer.go
index 92d73b4a24..a203d631af 100644
--- a/plc4go/pkg/api/cache/connectionContainer.go
+++ b/plc4go/pkg/api/cache/connectionContainer.go
@@ -162,11 +162,9 @@ func (c *connectionContainer) lease() <-chan
plc4go.PlcConnectionConnectResult {
// is definitely eagerly waiting for input.
c.log.Debug().Str("connectionString", c.connectionString).
Msg("Got lease instantly as connection was idle.")
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
+ c.wg.Go(func() {
ch <-
_default.NewDefaultPlcConnectionConnectResult(connection, nil)
- }()
+ })
case StateInUse, StateInitialized:
// If the connection is currently busy or not finished
initializing,
// add the new channel to the queue for this connection.
@@ -213,9 +211,7 @@ func (c *connectionContainer) returnConnection(ctx
context.Context, newState cac
// Send asynchronously as the receiver might have given up
waiting,
// and we don'c want anything to block here. 1ms should be
enough for
// the calling process to reach the blocking read.
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
+ c.wg.Go(func() {
// In this case we don'c need to check for blocks
// as the getConnection function of the connection cache
// is definitely eagerly waiting for input.
@@ -223,7 +219,7 @@ func (c *connectionContainer) returnConnection(ctx
context.Context, newState cac
c.log.Debug().Str("connectionString",
c.connectionString).
Int("waiting-queue-size", len(c.queue)).
Msg("Returned connection to the next client
waiting.")
- }()
+ })
} else {
// Otherwise, just mark the connection as idle.
c.log.Debug().Str("connectionString", c.connectionString).
diff --git a/plc4go/pkg/api/cache/plcConnectionLease.go
b/plc4go/pkg/api/cache/plcConnectionLease.go
index aef78f60a8..e2860442c9 100644
--- a/plc4go/pkg/api/cache/plcConnectionLease.go
+++ b/plc4go/pkg/api/cache/plcConnectionLease.go
@@ -98,9 +98,7 @@ func (t *plcConnectionLease) Close() <-chan
plc4go.PlcConnectionCloseResult {
result := make(chan plc4go.PlcConnectionCloseResult, 1)
- t.wg.Add(1)
- go func() {
- defer t.wg.Done()
+ t.wg.Go(func() {
// Check if the connection is still alive, if it is, put it
back into the cache
pingResults := t.Ping()
pingTimeout := time.NewTimer(5 * time.Second)
@@ -143,7 +141,7 @@ func (t *plcConnectionLease) Close() <-chan
plc4go.PlcConnectionCloseResult {
// Finish closing the connection.
result <-
_default.NewDefaultPlcConnectionCloseResultWithTraces(t, err, traces)
- }()
+ })
return result
}
diff --git a/plc4go/spi/default/DefaultBrowser.go
b/plc4go/spi/default/DefaultBrowser.go
index d7d6c73300..69cf6722b3 100644
--- a/plc4go/spi/default/DefaultBrowser.go
+++ b/plc4go/spi/default/DefaultBrowser.go
@@ -78,9 +78,7 @@ func (m *defaultBrowser) Browse(ctx context.Context,
browseRequest apiModel.PlcB
func (m *defaultBrowser) BrowseWithInterceptor(ctx context.Context,
browseRequest apiModel.PlcBrowseRequest, interceptor func(result
apiModel.PlcBrowseItem) bool) <-chan apiModel.PlcBrowseRequestResult {
result := make(chan apiModel.PlcBrowseRequestResult, 1)
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
m.log.Error().
@@ -101,6 +99,6 @@ func (m *defaultBrowser) BrowseWithInterceptor(ctx
context.Context, browseReques
browseResponse,
nil,
)
- }()
+ })
return result
}
diff --git a/plc4go/spi/default/DefaultCodec.go
b/plc4go/spi/default/DefaultCodec.go
index b4fc71dae8..498695a483 100644
--- a/plc4go/spi/default/DefaultCodec.go
+++ b/plc4go/spi/default/DefaultCodec.go
@@ -215,21 +215,21 @@ func (m *defaultCodec) TimeoutExpectations(now time.Time)
{
// Remove this expectation from the list.
m.log.Debug().Stringer("expectation",
expectation).Msg("timeout expectation")
// Call the error handler.
- go func(expectation spi.Expectation) {
+ m.wg.Go(func() {
if err :=
expectation.GetHandleError()(utils.NewTimeoutError(expectation.GetExpiration().Sub(expectation.GetCreationTime())));
err != nil {
m.log.Error().Err(err).Msg("Got an
error handling error on expectation")
}
- }(expectation)
+ })
return true
}
if err := expectation.GetContext().Err(); err != nil {
m.log.Debug().Err(err).Stringer("expectation",
expectation).Msg("expectation canceled")
// Remove this expectation from the list.
- go func(expectation spi.Expectation) {
+ m.wg.Go(func() {
if err := expectation.GetHandleError()(err);
err != nil {
m.log.Error().Err(err).Msg("Got an
error handling error on expectation")
}
- }(expectation)
+ })
return true
}
return false
@@ -252,11 +252,11 @@ func (m *defaultCodec) HandleMessages(message
spi.Message) bool {
if err := expectation.GetHandleMessage()(message); err
!= nil {
expectationLog.Debug().Err(err).Msg("errored
handling the message")
// Pass the error to the error handler.
- go func(expectation spi.Expectation) {
+ m.wg.Go(func() {
if err :=
expectation.GetHandleError()(err); err != nil {
m.log.Error().Err(err).Msg("Got
an error handling error on expectation")
}
- }(expectation)
+ })
return false
}
m.log.Trace().Msg("message handled")
@@ -273,12 +273,10 @@ func (m *defaultCodec) HandleMessages(message
spi.Message) bool {
func (m *defaultCodec) startWorker() {
m.log.Trace().Msg("starting worker")
- m.activeWorker.Add(1)
- go m.Work()
+ m.activeWorker.Go(m.Work)
}
func (m *defaultCodec) Work() {
- defer m.activeWorker.Done()
workerLog := m.log.With().Logger()
if !m.traceDefaultMessageCodecWorker {
workerLog = zerolog.Nop()
@@ -335,16 +333,14 @@ mainLoop:
var err error
{
syncer := make(chan struct{})
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer close(syncer)
if !m.running.Load() {
err = errors.New("not running")
return
}
message, err = m.Receive()
- }()
+ })
timeoutTimer := time.NewTimer(m.receiveTimeout)
select {
case <-syncer:
diff --git a/plc4go/spi/default/DefaultCodec_test.go
b/plc4go/spi/default/DefaultCodec_test.go
index c0ba8ea0c7..074901ab9a 100644
--- a/plc4go/spi/default/DefaultCodec_test.go
+++ b/plc4go/spi/default/DefaultCodec_test.go
@@ -23,6 +23,7 @@ import (
"context"
"fmt"
"os"
+ "sync"
"sync/atomic"
"testing"
"time"
@@ -1440,11 +1441,13 @@ func Test_defaultCodec_Work(t *testing.T) {
if tt.manipulator != nil {
tt.manipulator(t, m)
}
- go func() {
+ var wg sync.WaitGroup
+ t.Cleanup(wg.Wait)
+ wg.Go(func() {
// Stop after 200ms
time.Sleep(200 * time.Millisecond)
m.running.Store(false)
- }()
+ })
m.Work()
})
}
diff --git a/plc4go/spi/default/DefaultConnection.go
b/plc4go/spi/default/DefaultConnection.go
index e55573ca94..00a823d843 100644
--- a/plc4go/spi/default/DefaultConnection.go
+++ b/plc4go/spi/default/DefaultConnection.go
@@ -152,9 +152,7 @@ func (d *defaultConnection) Connect() <-chan
plc4go.PlcConnectionConnectResult {
func (d *defaultConnection) ConnectWithContext(ctx context.Context) <-chan
plc4go.PlcConnectionConnectResult {
d.log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
- d.wg.Add(1)
- go func() {
- defer d.wg.Done()
+ d.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
ch <- NewDefaultPlcConnectionConnectResult(nil,
errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
@@ -164,7 +162,7 @@ func (d *defaultConnection) ConnectWithContext(ctx
context.Context) <-chan plc4g
d.SetConnected(true)
connection := d.GetConnection()
ch <- NewDefaultPlcConnectionConnectResult(connection, err)
- }()
+ })
return ch
}
@@ -217,9 +215,7 @@ func (d *defaultConnection) IsConnected() bool {
func (d *defaultConnection) Ping() <-chan plc4go.PlcConnectionPingResult {
ch := make(chan plc4go.PlcConnectionPingResult, 1)
- d.wg.Add(1)
- go func() {
- defer d.wg.Done()
+ d.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
ch <-
NewDefaultPlcConnectionPingResult(errors.Errorf("panic-ed %v. Stack: %s", err,
debug.Stack()))
@@ -230,7 +226,7 @@ func (d *defaultConnection) Ping() <-chan
plc4go.PlcConnectionPingResult {
} else {
ch <- NewDefaultPlcConnectionPingResult(errors.New("not
connected"))
}
- }()
+ })
return ch
}
diff --git a/plc4go/spi/model/DefaultPlcReadRequest.go
b/plc4go/spi/model/DefaultPlcReadRequest.go
index de5c4a2109..09e6a2b0fd 100644
--- a/plc4go/spi/model/DefaultPlcReadRequest.go
+++ b/plc4go/spi/model/DefaultPlcReadRequest.go
@@ -150,9 +150,7 @@ func (d *DefaultPlcReadRequest)
ExecuteWithContextAndInterceptor(ctx context.Con
// Create a new result-channel, which completes as soon as all
sub-result-channels have returned
resultChannel := make(chan apiModel.PlcReadRequestResult, 1)
- d.wg.Add(1)
- go func() {
- defer d.wg.Done()
+ d.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
resultChannel <-
NewDefaultPlcReadRequestResult(d, nil, errors.Errorf("panic-ed %v. Stack: %s",
err, debug.Stack()))
@@ -173,6 +171,6 @@ func (d *DefaultPlcReadRequest)
ExecuteWithContextAndInterceptor(ctx context.Con
result := d.readRequestInterceptor.ProcessReadResponses(ctx, d,
subResults)
// Return the final result
resultChannel <- result
- }()
+ })
return resultChannel
}
diff --git a/plc4go/spi/model/DefaultPlcUnsubscriptionRequest.go
b/plc4go/spi/model/DefaultPlcUnsubscriptionRequest.go
index 6b9e570318..8450551d4d 100644
--- a/plc4go/spi/model/DefaultPlcUnsubscriptionRequest.go
+++ b/plc4go/spi/model/DefaultPlcUnsubscriptionRequest.go
@@ -70,9 +70,7 @@ func (d *DefaultPlcUnsubscriptionRequest) Execute() <-chan
apiModel.PlcUnsubscri
func (d *DefaultPlcUnsubscriptionRequest) ExecuteWithContext(ctx
context.Context) <-chan apiModel.PlcUnsubscriptionRequestResult {
results := make(chan apiModel.PlcUnsubscriptionRequestResult, 1)
- d.wg.Add(1)
- go func() {
- defer d.wg.Done()
+ d.wg.Go(func() {
var collectedErrors []error
for _, handle := range d.subscriptionHandles {
select {
@@ -90,7 +88,7 @@ func (d *DefaultPlcUnsubscriptionRequest)
ExecuteWithContext(ctx context.Context
finalErr = errors.Wrap(err, "error unsubscribing from
all")
}
results <- NewDefaultPlcUnsubscriptionRequestResult(d,
NewDefaultPlcUnsubscriptionResponse(d), finalErr)
- }()
+ })
return results
}
diff --git a/plc4go/spi/model/DefaultPlcWriteRequest.go
b/plc4go/spi/model/DefaultPlcWriteRequest.go
index dc7c2e7d84..c99cd6776d 100644
--- a/plc4go/spi/model/DefaultPlcWriteRequest.go
+++ b/plc4go/spi/model/DefaultPlcWriteRequest.go
@@ -170,9 +170,7 @@ func (d *DefaultPlcWriteRequest)
ExecuteWithContextAndInterceptor(ctx context.Co
// Create a new result-channel, which completes as soon as all
sub-result-channels have returned
resultChannel := make(chan apiModel.PlcWriteRequestResult, 1)
- d.wg.Add(1)
- go func() {
- defer d.wg.Done()
+ d.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
resultChannel <-
NewDefaultPlcWriteRequestResult(d, nil, errors.Errorf("panic-ed %v. Stack: %s",
err, debug.Stack()))
@@ -193,7 +191,7 @@ func (d *DefaultPlcWriteRequest)
ExecuteWithContextAndInterceptor(ctx context.Co
result := d.writeRequestInterceptor.ProcessWriteResponses(ctx,
d, subResults)
// Return the final result
resultChannel <- result
- }()
+ })
return resultChannel
}
diff --git a/plc4go/spi/pool/WorkerPool.go b/plc4go/spi/pool/WorkerPool.go
index 7611ce242e..8bbc1c0149 100644
--- a/plc4go/spi/pool/WorkerPool.go
+++ b/plc4go/spi/pool/WorkerPool.go
@@ -27,7 +27,7 @@ import (
"github.com/apache/plc4x/plc4go/spi/options"
)
-type Runnable func()
+type Runnable func(ctx context.Context)
type CompletionFuture interface {
AwaitCompletion(ctx context.Context) error
diff --git a/plc4go/spi/pool/WorkerPool_test.go
b/plc4go/spi/pool/WorkerPool_test.go
index 5436391537..19f4b61d42 100644
--- a/plc4go/spi/pool/WorkerPool_test.go
+++ b/plc4go/spi/pool/WorkerPool_test.go
@@ -20,7 +20,9 @@
package pool
import (
+ "context"
"math/rand"
+ "sync"
"testing"
"time"
@@ -137,21 +139,29 @@ func TestNewDynamicExecutor(t *testing.T) {
timeToBecomeUnused = 100 *
time.Millisecond
}
t.Log("fill some jobs")
- go func() {
+ var wg sync.WaitGroup
+ t.Cleanup(wg.Wait)
+ wg.Go(func() {
for i := 0; i < 500; i++ {
e.workItems <- workItem{
workItemId: int32(i),
- runnable: func() {
+ runnable:
func(runnableCtx context.Context) {
+ ctx, cancel :=
context.WithCancel(t.Context())
+
context.AfterFunc(runnableCtx, cancel)
max := 100
min := 10
sleepTime :=
time.Duration(rand.Intn(max-min)+min) * time.Millisecond
t.Logf("Sleeping for %v", sleepTime)
-
time.Sleep(sleepTime)
+ timer :=
time.NewTimer(sleepTime)
+ select {
+ case <-timer.C:
+ case
<-ctx.Done():
+ }
},
completionFuture:
&future{},
}
}
- }()
+ })
},
executorValidator: func(t *testing.T, e
*dynamicExecutor) bool {
time.Sleep(500 * time.Millisecond)
diff --git a/plc4go/spi/pool/dynamicExecutor.go
b/plc4go/spi/pool/dynamicExecutor.go
index 748f49f0eb..f3796d6217 100644
--- a/plc4go/spi/pool/dynamicExecutor.go
+++ b/plc4go/spi/pool/dynamicExecutor.go
@@ -44,8 +44,6 @@ type dynamicExecutor struct {
interrupter chan struct{}
wg sync.WaitGroup // use to track spawned go routines
-
- dynamicWorkers sync.WaitGroup
}
func newDynamicExecutor(queueDepth, maxNumberOfWorkers int, log
zerolog.Logger) *dynamicExecutor {
@@ -65,18 +63,14 @@ func (e *dynamicExecutor) Start() {
if e.interrupter != nil {
e.log.Debug().Msg("Ensuring that the old spawner/killers are
not running")
close(e.interrupter)
- e.dynamicWorkers.Wait()
+ e.wg.Wait()
}
e.executor.Start()
mutex := sync.Mutex{}
e.interrupter = make(chan struct{})
// Worker spawner
- e.wg.Add(1)
- go func() {
- defer e.wg.Done()
- e.dynamicWorkers.Add(1)
- defer e.dynamicWorkers.Done()
+ e.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
e.log.Error().
@@ -123,13 +117,9 @@ func (e *dynamicExecutor) Start() {
}()
}
workerLog.Info().Msg("Terminated")
- }()
+ })
// Worker killer
- e.dynamicWorkers.Add(1)
- e.wg.Add(1)
- go func() {
- defer e.wg.Done()
- defer e.dynamicWorkers.Done()
+ e.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
e.log.Error().
@@ -181,7 +171,7 @@ func (e *dynamicExecutor) Start() {
}()
}
workerLog.Info().Msg("Terminated")
- }()
+ })
}
func (e *dynamicExecutor) Stop() {
@@ -199,7 +189,7 @@ func (e *dynamicExecutor) Stop() {
e.log.Debug().
Interface("currentNumberOfWorkers",
e.currentNumberOfWorkers.Load()).
Msg("waiting for currentNumberOfWorkers dynamic workers to
stop")
- e.dynamicWorkers.Wait()
+ e.wg.Wait()
e.log.Trace().Msg("stopped")
}
diff --git a/plc4go/spi/pool/dynamicExecutor_test.go
b/plc4go/spi/pool/dynamicExecutor_test.go
index 536d1544e9..0b66250570 100644
--- a/plc4go/spi/pool/dynamicExecutor_test.go
+++ b/plc4go/spi/pool/dynamicExecutor_test.go
@@ -20,6 +20,7 @@
package pool
import (
+ "context"
"sync/atomic"
"testing"
"time"
@@ -86,7 +87,7 @@ func Test_dynamicExecutor_Start(t *testing.T) {
},
setup: func(t *testing.T, fields *fields) {
fields.executor.log = produceTestingLogger(t)
- fields.executor.workItems <- workItem{1, func()
{}, &future{}}
+ fields.executor.workItems <- workItem{1,
func(context.Context) {}, &future{}}
},
},
{
@@ -101,7 +102,7 @@ func Test_dynamicExecutor_Start(t *testing.T) {
},
setup: func(t *testing.T, fields *fields) {
fields.executor.log = produceTestingLogger(t)
- fields.executor.workItems <- workItem{1, func()
{}, &future{}}
+ fields.executor.workItems <- workItem{1,
func(context.Context) {}, &future{}}
},
startTwice: true,
},
@@ -152,7 +153,7 @@ func Test_dynamicExecutor_Stop(t *testing.T) {
},
setup: func(t *testing.T, fields *fields) {
fields.executor.log = produceTestingLogger(t)
- fields.executor.workItems <- workItem{1, func()
{}, &future{}}
+ fields.executor.workItems <- workItem{1,
func(context.Context) {}, &future{}}
},
},
{
@@ -167,7 +168,7 @@ func Test_dynamicExecutor_Stop(t *testing.T) {
},
setup: func(t *testing.T, fields *fields) {
fields.executor.log = produceTestingLogger(t)
- fields.executor.workItems <- workItem{1, func()
{}, &future{}}
+ fields.executor.workItems <- workItem{1,
func(context.Context) {}, &future{}}
},
},
{
@@ -182,7 +183,7 @@ func Test_dynamicExecutor_Stop(t *testing.T) {
},
setup: func(t *testing.T, fields *fields) {
fields.executor.log = produceTestingLogger(t)
- fields.executor.workItems <- workItem{1, func()
{}, &future{}}
+ fields.executor.workItems <- workItem{1,
func(context.Context) {}, &future{}}
},
stopTwice: true,
},
diff --git a/plc4go/spi/pool/executor.go b/plc4go/spi/pool/executor.go
index 22a5977fdb..a63e6e8bc3 100644
--- a/plc4go/spi/pool/executor.go
+++ b/plc4go/spi/pool/executor.go
@@ -39,6 +39,9 @@ type executor struct {
workItems chan workItem
traceWorkers bool
+ ctx context.Context
+ ctxCancel context.CancelFunc
+
stateChange sync.RWMutex
workerWaitGroup sync.WaitGroup
@@ -50,6 +53,7 @@ func newExecutor(queueDepth int, numberOfInitialWorkers int,
customLogger zerolo
workItems: make(chan workItem, queueDepth),
log: customLogger,
}
+ e.ctx, e.ctxCancel = context.WithCancel(context.Background())
workers := make([]*worker, numberOfInitialWorkers)
for i := 0; i < numberOfInitialWorkers; i++ {
w := newWorker(customLogger, i, e)
@@ -71,6 +75,10 @@ func (e *executor) getWorkerWaitGroup() *sync.WaitGroup {
return &e.workerWaitGroup
}
+func (e *executor) getCtx() context.Context {
+ return e.ctx
+}
+
func (e *executor) Submit(ctx context.Context, workItemId int32, runnable
Runnable) CompletionFuture {
if runnable == nil {
value := atomic.Value{}
@@ -127,11 +135,13 @@ func (e *executor) Stop() {
}
e.shutdown = true
for i := 0; i < len(e.worker); i++ {
+ e.log.Debug().Int("workerId", i).Msg("stopping worker")
e.worker[i].stop(true)
}
e.running = false
e.shutdown = false
e.log.Debug().Int("nWorkers", len(e.worker)).Msg("waiting for nWorkers
workers to stop")
+ e.ctxCancel()
e.workerWaitGroup.Wait()
e.log.Trace().Msg("stopped")
}
diff --git a/plc4go/spi/pool/executor_plc4xgen.go
b/plc4go/spi/pool/executor_plc4xgen.go
index 873919c971..48aa2406f1 100644
--- a/plc4go/spi/pool/executor_plc4xgen.go
+++ b/plc4go/spi/pool/executor_plc4xgen.go
@@ -93,6 +93,44 @@ func (d *executor) SerializeWithWriteBuffer(ctx
context.Context, writeBuffer uti
if err := writeBuffer.WriteBit("traceWorkers", d.traceWorkers); err !=
nil {
return err
}
+
+ if d.ctx != nil {
+ if serializableField, ok := any(d.ctx).(utils.Serializable); ok
{
+ if err := writeBuffer.PushContext("ctx"); err != nil {
+ return err
+ }
+ if err :=
serializableField.SerializeWithWriteBuffer(ctx, writeBuffer); err != nil {
+ return err
+ }
+ if err := writeBuffer.PopContext("ctx"); err != nil {
+ return err
+ }
+ } else {
+ stringValue := fmt.Sprintf("%v", d.ctx)
+ if err := writeBuffer.WriteString("ctx",
uint32(len(stringValue)*8), stringValue); err != nil {
+ return err
+ }
+ }
+ }
+
+ if d.ctxCancel != nil {
+ if serializableField, ok :=
any(d.ctxCancel).(utils.Serializable); ok {
+ if err := writeBuffer.PushContext("ctxCancel"); err !=
nil {
+ return err
+ }
+ if err :=
serializableField.SerializeWithWriteBuffer(ctx, writeBuffer); err != nil {
+ return err
+ }
+ if err := writeBuffer.PopContext("ctxCancel"); err !=
nil {
+ return err
+ }
+ } else {
+ stringValue := fmt.Sprintf("%v", d.ctxCancel)
+ if err := writeBuffer.WriteString("ctxCancel",
uint32(len(stringValue)*8), stringValue); err != nil {
+ return err
+ }
+ }
+ }
if err := writeBuffer.PopContext("executor"); err != nil {
return err
}
diff --git a/plc4go/spi/pool/executor_test.go b/plc4go/spi/pool/executor_test.go
index 1a9d658bf8..b67d6bcd22 100644
--- a/plc4go/spi/pool/executor_test.go
+++ b/plc4go/spi/pool/executor_test.go
@@ -259,9 +259,12 @@ func Test_executor_Submit(t *testing.T) {
},
args: args{
workItemId: 13,
- runnable: func() {
+ runnable: func(ctx context.Context) {
// We do something for 3 seconds
- <-time.NewTimer(3 * time.Second).C
+ select {
+ case <-time.NewTimer(3 * time.Second).C:
+ case <-ctx.Done():
+ }
},
context: func() context.Context {
ctx, cancelFunc :=
context.WithCancel(context.Background())
@@ -281,9 +284,12 @@ func Test_executor_Submit(t *testing.T) {
},
args: args{
workItemId: 13,
- runnable: func() {
+ runnable: func(ctx context.Context) {
// We do something for 3 seconds
- <-time.NewTimer(3 * time.Second).C
+ select {
+ case <-time.NewTimer(3 * time.Second).C:
+ case <-ctx.Done():
+ }
},
context: t.Context(),
},
@@ -306,7 +312,7 @@ func Test_executor_Submit(t *testing.T) {
}(),
args: args{
workItemId: 13,
- runnable: func() {
+ runnable: func(_ context.Context) {
// NOOP
},
context: t.Context(),
diff --git a/plc4go/spi/pool/worker.go b/plc4go/spi/pool/worker.go
index 8a11c41428..60b95cbe6e 100644
--- a/plc4go/spi/pool/worker.go
+++ b/plc4go/spi/pool/worker.go
@@ -20,6 +20,7 @@
package pool
import (
+ "context"
"runtime/debug"
"sync"
"sync/atomic"
@@ -35,6 +36,7 @@ type worker struct {
isTraceWorkers() bool
getWorksItems() chan workItem
getWorkerWaitGroup() *sync.WaitGroup
+ getCtx() context.Context
}
lastReceived atomic.Value
@@ -52,6 +54,7 @@ func newWorker(localLog zerolog.Logger, workerId int,
executor interface {
isTraceWorkers() bool
getWorksItems() chan workItem
getWorkerWaitGroup() *sync.WaitGroup
+ getCtx() context.Context
}) *worker {
w := &worker{
id: workerId,
@@ -82,9 +85,8 @@ func (w *worker) start() {
if w.executor.isTraceWorkers() {
w.log.Debug().Stringer("worker", w).Msg("Starting worker")
}
- w.executor.getWorkerWaitGroup().Add(1)
w.running.Store(true)
- go w.work()
+ w.executor.getWorkerWaitGroup().Go(w.work)
}
func (w *worker) stop(interrupt bool) {
@@ -106,7 +108,6 @@ func (w *worker) stop(interrupt bool) {
}
func (w *worker) work() {
- defer w.executor.getWorkerWaitGroup().Done()
defer func() {
if err := recover(); err != nil {
w.log.Error().
@@ -137,12 +138,15 @@ func (w *worker) work() {
// TODO: do we need to complete with a error?
} else {
workItemLog.Debug().Msg("Running work item")
- _workItem.runnable()
+ _workItem.runnable(w.executor.getCtx())
_workItem.completionFuture.complete()
workItemLog.Debug().Msg("work item completed")
}
case <-w.interrupter:
workerLog.Debug().Msg("We got interrupted")
+ case <-w.executor.getCtx().Done():
+ workerLog.Debug().Msg("Ctx done")
+ return
}
}
workerLog.Trace().Msg("done")
diff --git a/plc4go/spi/pool/worker_test.go b/plc4go/spi/pool/worker_test.go
index bdd96b12cf..1e1a8060ab 100644
--- a/plc4go/spi/pool/worker_test.go
+++ b/plc4go/spi/pool/worker_test.go
@@ -20,6 +20,7 @@
package pool
import (
+ "context"
"sync"
"sync/atomic"
"testing"
@@ -37,6 +38,7 @@ func Test_worker_initialize(t *testing.T) {
isTraceWorkers() bool
getWorksItems() chan workItem
getWorkerWaitGroup() *sync.WaitGroup
+ getCtx() context.Context
}
}
tests := []struct {
@@ -67,6 +69,7 @@ func Test_worker_start(t *testing.T) {
isTraceWorkers() bool
getWorksItems() chan workItem
getWorkerWaitGroup() *sync.WaitGroup
+ getCtx() context.Context
}
lastReceived atomic.Value
interrupter chan struct{}
@@ -86,7 +89,7 @@ func Test_worker_start(t *testing.T) {
}
e.workItems <- workItem{
workItemId: 0,
- runnable: func() {
+ runnable: func(context.Context)
{
// No-op
},
completionFuture: &future{},
@@ -105,7 +108,7 @@ func Test_worker_start(t *testing.T) {
}
e.workItems <- workItem{
workItemId: 0,
- runnable: func() {
+ runnable: func(context.Context)
{
// No-op
},
completionFuture: &future{},
@@ -146,6 +149,7 @@ func Test_worker_stop(t *testing.T) {
isTraceWorkers() bool
getWorksItems() chan workItem
getWorkerWaitGroup() *sync.WaitGroup
+ getCtx() context.Context
}
lastReceived atomic.Value
interrupter chan struct{}
@@ -168,15 +172,17 @@ func Test_worker_stop(t *testing.T) {
workItems: make(chan
workItem),
traceWorkers: true,
}
- go func() {
+ var wg sync.WaitGroup
+ t.Cleanup(wg.Wait)
+ wg.Go(func() {
e.workItems <- workItem{
workItemId: 0,
- runnable: func() {
+ runnable:
func(context.Context) {
// No-op
},
completionFuture:
&future{},
}
- }()
+ })
return e
}(),
},
@@ -226,15 +232,17 @@ func Test_worker_work(t *testing.T) {
workItems: make(chan
workItem),
traceWorkers: true,
}
- go func() {
+ var wg sync.WaitGroup
+ t.Cleanup(wg.Wait)
+ wg.Go(func() {
e.workItems <- workItem{
workItemId: 0,
- runnable: func() {
+ runnable:
func(context.Context) {
panic("Oh no
what should I do???")
},
completionFuture:
&future{},
}
- }()
+ })
return e
}(),
},
@@ -260,15 +268,17 @@ func Test_worker_work(t *testing.T) {
workItems: make(chan
workItem),
traceWorkers: true,
}
- go func() {
+ var wg sync.WaitGroup
+ t.Cleanup(wg.Wait)
+ wg.Go(func() {
e.workItems <- workItem{
workItemId: 0,
- runnable: func() {
+ runnable:
func(context.Context) {
time.Sleep(time.Millisecond * 70)
},
completionFuture:
&future{},
}
- }()
+ })
return e
}(),
},
@@ -318,17 +328,19 @@ func Test_worker_work(t *testing.T) {
workItems: make(chan
workItem),
traceWorkers: true,
}
- go func() {
+ var wg sync.WaitGroup
+ t.Cleanup(wg.Wait)
+ wg.Go(func() {
completionFuture := &future{}
completionFuture.cancelRequested.Store(true)
e.workItems <- workItem{
workItemId: 0,
- runnable: func() {
+ runnable:
func(context.Context) {
time.Sleep(time.Millisecond * 70)
},
completionFuture:
completionFuture,
}
- }()
+ })
return e
}(),
},
diff --git a/plc4go/spi/transactions/RequestTransaction.go
b/plc4go/spi/transactions/RequestTransaction.go
index 2b6be82778..4b0aa6467b 100644
--- a/plc4go/spi/transactions/RequestTransaction.go
+++ b/plc4go/spi/transactions/RequestTransaction.go
@@ -129,9 +129,9 @@ func (t *requestTransaction) Submit(operation
RequestTransactionRunnable) {
t.log.Warn().Msg("Operation already set")
}
t.log.Trace().Int32("transactionId", t.transactionId).Msg("Submission")
- t.operation = func() {
+ t.operation = func(ctx context.Context) {
t.log.Trace().Int32("transactionId",
t.transactionId).Msg("Start operation")
- operation(t)
+ operation(ctx, t)
t.log.Trace().Int32("transactionId",
t.transactionId).Msg("Completed operation")
}
t.parent.submitTransaction(t)
diff --git a/plc4go/spi/transactions/RequestTransactionManager.go
b/plc4go/spi/transactions/RequestTransactionManager.go
index b4dd3c7fb4..0fc5dcf055 100644
--- a/plc4go/spi/transactions/RequestTransactionManager.go
+++ b/plc4go/spi/transactions/RequestTransactionManager.go
@@ -60,7 +60,7 @@ func init() {
}()
}
-type RequestTransactionRunnable func(RequestTransaction)
+type RequestTransactionRunnable func(context.Context, RequestTransaction)
// RequestTransactionManager handles transactions
type RequestTransactionManager interface {
@@ -77,7 +77,7 @@ type RequestTransactionManager interface {
func NewRequestTransactionManager(numberOfConcurrentRequests int, _options
...options.WithOption) RequestTransactionManager {
extractTraceTransactionManagerTransactions, _ :=
options.ExtractTraceTransactionManagerTransactions(_options...)
customLogger :=
options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
- _requestTransactionManager := &requestTransactionManager{
+ rtm := &requestTransactionManager{
numberOfConcurrentRequests: numberOfConcurrentRequests,
currentTransactionId: 0,
workLog: *list.New(),
@@ -87,13 +87,14 @@ func
NewRequestTransactionManager(numberOfConcurrentRequests int, _options ...op
log: customLogger,
}
+ rtm.ctx, rtm.cancelCtx = context.WithCancel(context.Background())
for _, option := range _options {
switch option := option.(type) {
case *withCustomExecutor:
- _requestTransactionManager.executor = option.executor
+ rtm.executor = option.executor
}
}
- return _requestTransactionManager
+ return rtm
}
// WithCustomExecutor sets a custom Executor for the RequestTransactionManager
@@ -129,6 +130,9 @@ type requestTransactionManager struct {
shutdown atomic.Bool // Indicates it this rtm is in shutdown
+ ctx context.Context `ignore:"true"`
+ cancelCtx context.CancelFunc `ignore:"true"`
+
traceTransactionManagerTransactions bool // flag set to true if it
should trace transactions
log zerolog.Logger
@@ -184,7 +188,7 @@ func (r *requestTransactionManager) processWorklog() {
Int("nRunningRequests", len(r.runningRequests)).
Msg("Handling next. (Adding to running requests
(length: nRunningRequests))")
r.runningRequests = append(r.runningRequests, next)
- completionFuture := r.executor.Submit(context.Background(),
next.transactionId, next.operation)
+ completionFuture := r.executor.Submit(r.ctx,
next.transactionId, next.operation)
next.setCompletionFuture(completionFuture)
r.workLog.Remove(front)
}
@@ -284,6 +288,7 @@ func (r *requestTransactionManager) CloseGraceful(timeout
time.Duration) error {
} else {
r.log.Warn().Msg("not closing shared instance")
}
+ r.cancelCtx()
r.log.Debug().Msg("closed")
return nil
}
diff --git a/plc4go/spi/transactions/RequestTransactionManager_test.go
b/plc4go/spi/transactions/RequestTransactionManager_test.go
index 76868fa91f..484894d660 100644
--- a/plc4go/spi/transactions/RequestTransactionManager_test.go
+++ b/plc4go/spi/transactions/RequestTransactionManager_test.go
@@ -26,8 +26,8 @@ import (
"testing"
"time"
- "github.com/rs/zerolog/log"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/pool"
@@ -39,17 +39,23 @@ func TestNewRequestTransactionManager(t *testing.T) {
requestTransactionManagerOptions []options.WithOption
}
tests := []struct {
- name string
- args args
- setup func(t *testing.T, args *args)
- want RequestTransactionManager
+ name string
+ args args
+ setup func(t *testing.T, args *args)
+ wantAssert func(*testing.T, RequestTransactionManager) bool
}{
{
name: "just create one",
- want: &requestTransactionManager{
- workLog: *list.New(),
- executor: sharedExecutorInstance,
- log: log.Logger,
+ wantAssert: func(t *testing.T, rtm
RequestTransactionManager) bool {
+ require.IsType(t, &requestTransactionManager{},
rtm)
+ rtmi := rtm.(*requestTransactionManager)
+ assert.NotNil(t, rtm)
+ assert.NotNil(t, rtmi.executor)
+ assert.Same(t, rtmi.executor,
sharedExecutorInstance)
+ assert.NotNil(t, rtmi.workLog)
+ assert.NotNil(t, rtmi.ctx)
+ assert.NotNil(t, rtmi.cancelCtx)
+ return true
},
},
{
@@ -60,11 +66,17 @@ func TestNewRequestTransactionManager(t *testing.T) {
WithCustomExecutor(sharedExecutorInstance),
},
},
- want: &requestTransactionManager{
- numberOfConcurrentRequests: 2,
- workLog: *list.New(),
- executor:
sharedExecutorInstance,
- log: log.Logger,
+ wantAssert: func(t *testing.T, rtm
RequestTransactionManager) bool {
+ require.IsType(t, &requestTransactionManager{},
rtm)
+ rtmi := rtm.(*requestTransactionManager)
+ assert.NotNil(t, rtm)
+ assert.NotNil(t, rtmi.executor)
+ assert.Same(t, rtmi.executor,
sharedExecutorInstance)
+ assert.NotNil(t, rtmi.workLog)
+ assert.NotNil(t, rtmi.ctx)
+ assert.NotNil(t, rtmi.cancelCtx)
+ assert.Equal(t, 2,
rtmi.numberOfConcurrentRequests)
+ return true
},
},
}
@@ -73,8 +85,9 @@ func TestNewRequestTransactionManager(t *testing.T) {
if tt.setup != nil {
tt.setup(t, &tt.args)
}
- if got :=
NewRequestTransactionManager(tt.args.numberOfConcurrentRequests,
tt.args.requestTransactionManagerOptions...); !assert.Equal(t, tt.want, got) {
- t.Errorf("NewRequestTransactionManager() = %v,
want %v", got, tt.want)
+ got :=
NewRequestTransactionManager(tt.args.numberOfConcurrentRequests,
tt.args.requestTransactionManagerOptions...)
+ if !assert.True(t, tt.wantAssert(t, got)) {
+ t.Errorf("NewRequestTransactionManager() = %v",
got)
}
})
}
@@ -458,7 +471,7 @@ func Test_requestTransactionManager_submitTransaction(t
*testing.T) {
name: "submit it",
args: args{
handle: &requestTransaction{
- operation: func() {
+ operation: func(context.Context) {
// doesn't matter
},
},
@@ -492,6 +505,8 @@ func Test_requestTransactionManager_Close(t *testing.T) {
currentTransactionId int32
workLog list.List
executor pool.Executor
+ ctx context.Context
+ cancelCtx context.CancelFunc
traceTransactionManagerTransactions bool
}
tests := []struct {
@@ -503,6 +518,7 @@ func Test_requestTransactionManager_Close(t *testing.T) {
{
name: "close it",
setup: func(t *testing.T, fields *fields) {
+ fields.ctx, fields.cancelCtx =
context.WithCancel(t.Context())
executor := NewMockExecutor(t)
executor.EXPECT().Close().Return(nil)
fields.executor = executor
@@ -522,6 +538,8 @@ func Test_requestTransactionManager_Close(t *testing.T) {
workLog:
tt.fields.workLog,
executor:
tt.fields.executor,
traceTransactionManagerTransactions:
tt.fields.traceTransactionManagerTransactions,
+ ctx:
tt.fields.ctx,
+ cancelCtx:
tt.fields.cancelCtx,
log:
produceTestingLogger(t),
}
tt.wantErr(t, r.Close(), fmt.Sprintf("Close()"))
@@ -536,6 +554,8 @@ func Test_requestTransactionManager_CloseGraceful(t
*testing.T) {
currentTransactionId int32
workLog list.List
executor pool.Executor
+ ctx context.Context
+ cancelCtx context.CancelFunc
traceTransactionManagerTransactions bool
}
type args struct {
@@ -551,6 +571,7 @@ func Test_requestTransactionManager_CloseGraceful(t
*testing.T) {
{
name: "close it",
setup: func(t *testing.T, fields *fields) {
+ fields.ctx, fields.cancelCtx =
context.WithCancel(t.Context())
executor := NewMockExecutor(t)
executor.EXPECT().Close().Return(nil)
fields.executor = executor
@@ -563,6 +584,7 @@ func Test_requestTransactionManager_CloseGraceful(t
*testing.T) {
timeout: 20 * time.Millisecond,
},
setup: func(t *testing.T, fields *fields) {
+ fields.ctx, fields.cancelCtx =
context.WithCancel(t.Context())
executor := NewMockExecutor(t)
executor.EXPECT().Close().Return(nil)
fields.executor = executor
@@ -580,6 +602,7 @@ func Test_requestTransactionManager_CloseGraceful(t
*testing.T) {
timeout: 20 * time.Millisecond,
},
setup: func(t *testing.T, fields *fields) {
+ fields.ctx, fields.cancelCtx =
context.WithCancel(t.Context())
executor := NewMockExecutor(t)
executor.EXPECT().Close().Return(nil)
fields.executor = executor
@@ -599,6 +622,8 @@ func Test_requestTransactionManager_CloseGraceful(t
*testing.T) {
workLog:
tt.fields.workLog,
executor:
tt.fields.executor,
traceTransactionManagerTransactions:
tt.fields.traceTransactionManagerTransactions,
+ ctx:
tt.fields.ctx,
+ cancelCtx:
tt.fields.cancelCtx,
log:
produceTestingLogger(t),
}
tt.wantErr(t, r.CloseGraceful(tt.args.timeout),
fmt.Sprintf("CloseGraceful(%v)", tt.args.timeout))
diff --git a/plc4go/spi/transactions/RequestTransaction_test.go
b/plc4go/spi/transactions/RequestTransaction_test.go
index 2f8ead7770..57ec0990ba 100644
--- a/plc4go/spi/transactions/RequestTransaction_test.go
+++ b/plc4go/spi/transactions/RequestTransaction_test.go
@@ -21,6 +21,7 @@ package transactions
import (
"context"
+ "sync"
"testing"
"time"
@@ -230,7 +231,7 @@ func Test_requestTransaction_Submit(t1 *testing.T) {
parent: &requestTransactionManager{},
},
args: args{
- operation: func(_ RequestTransaction) {
+ operation: func(context.Context,
RequestTransaction) {
// NOOP
},
},
@@ -239,12 +240,12 @@ func Test_requestTransaction_Submit(t1 *testing.T) {
name: "submit something again",
fields: fields{
parent: &requestTransactionManager{},
- operation: func() {
+ operation: func(context.Context) {
// NOOP
},
},
args: args{
- operation: func(_ RequestTransaction) {
+ operation: func(context.Context,
RequestTransaction) {
// NOOP
},
},
@@ -253,29 +254,29 @@ func Test_requestTransaction_Submit(t1 *testing.T) {
name: "submit completed",
fields: fields{
parent: &requestTransactionManager{},
- operation: func() {
+ operation: func(context.Context) {
// NOOP
},
completed: true,
},
args: args{
- operation: func(_ RequestTransaction) {
+ operation: func(context.Context,
RequestTransaction) {
// NOOP
},
},
},
}
for _, tt := range tests {
- t1.Run(tt.name, func(t1 *testing.T) {
- t := &requestTransaction{
+ t1.Run(tt.name, func(t *testing.T) {
+ rt := &requestTransaction{
parent: tt.fields.parent,
transactionId: tt.fields.transactionId,
operation: tt.fields.operation,
log: tt.fields.transactionLog,
completed: tt.fields.completed,
}
- t.Submit(tt.args.operation)
- t.operation()
+ rt.Submit(tt.args.operation)
+ rt.operation(t.Context())
})
}
}
@@ -317,13 +318,15 @@ func Test_requestTransaction_AwaitCompletion(t1
*testing.T) {
expect.AwaitCompletion(mock.Anything).Return(nil)
var completionFuture pool.CompletionFuture =
completionFutureMock
transaction.completionFuture.Store(&completionFuture)
- go func() {
+ var wg sync.WaitGroup
+ t.Cleanup(wg.Wait)
+ wg.Go(func() {
time.Sleep(100 * time.Millisecond)
r := transaction.parent
r.workLogMutex.RLock()
defer r.workLogMutex.RUnlock()
r.runningRequests =
append(r.runningRequests, &requestTransaction{transactionId: 1})
- }()
+ })
},
},
}
diff --git a/plc4go/spi/transports/pcap/TransportInstance.go
b/plc4go/spi/transports/pcap/TransportInstance.go
index f61ab760bc..d6ecc63452 100644
--- a/plc4go/spi/transports/pcap/TransportInstance.go
+++ b/plc4go/spi/transports/pcap/TransportInstance.go
@@ -55,6 +55,8 @@ type TransportInstance struct {
handle *pcap.Handle
reader *bufio.Reader
+ wg sync.WaitGroup
+
log zerolog.Logger
}
@@ -95,7 +97,7 @@ func (m *TransportInstance) Connect() error {
buffer := new(bytes.Buffer)
m.reader = bufio.NewReader(buffer)
- go func(m *TransportInstance, buffer *bytes.Buffer) {
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
m.log.Error().
@@ -161,7 +163,7 @@ func (m *TransportInstance) Connect() error {
buffer.Write(payload)
lastPacketTime = &captureInfo.Timestamp
}
- }(m, buffer)
+ })
return nil
}
@@ -174,6 +176,7 @@ func (m *TransportInstance) Close() error {
handle.Close()
}
m.connected.Store(false)
+ m.wg.Wait()
return nil
}
diff --git a/plc4go/spi/transports/tcp/TransportInstance_test.go
b/plc4go/spi/transports/tcp/TransportInstance_test.go
index 4f847343bf..6dc0ff69cb 100644
--- a/plc4go/spi/transports/tcp/TransportInstance_test.go
+++ b/plc4go/spi/transports/tcp/TransportInstance_test.go
@@ -23,6 +23,7 @@ import (
"bufio"
"context"
"net"
+ "sync"
"testing"
"github.com/rs/zerolog/log"
@@ -102,9 +103,11 @@ func TestTransportInstance_Close(t *testing.T) {
t.Cleanup(func() {
assert.NoError(t,
listener.Close())
})
- go func() {
+ var wg sync.WaitGroup
+ t.Cleanup(wg.Wait)
+ wg.Go(func() {
_, _ = listener.Accept()
- }()
+ })
tcp, err := net.DialTCP("tcp", nil,
listener.Addr().(*net.TCPAddr))
require.NoError(t, err)
t.Cleanup(func() {
@@ -206,9 +209,11 @@ func TestTransportInstance_ConnectWithContext(t
*testing.T) {
t.Cleanup(func() {
assert.NoError(t,
listener.Close())
})
- go func() {
+ var wg sync.WaitGroup
+ t.Cleanup(wg.Wait)
+ wg.Go(func() {
_, _ = listener.Accept()
- }()
+ })
return listener.Addr().(*net.TCPAddr)
}(),
},
@@ -401,9 +406,11 @@ func TestTransportInstance_Write(t *testing.T) {
t.Cleanup(func() {
assert.NoError(t, listener.Close())
})
- go func() {
+ var wg sync.WaitGroup
+ t.Cleanup(wg.Wait)
+ wg.Go(func() {
_, _ = listener.Accept()
- }()
+ })
tcp, err := net.DialTCP("tcp", nil,
listener.Addr().(*net.TCPAddr))
require.NoError(t, err)
t.Cleanup(func() {
diff --git a/plc4go/spi/transports/udp/TransportInstance.go
b/plc4go/spi/transports/udp/TransportInstance.go
index b66ecb8f7c..291b997481 100644
--- a/plc4go/spi/transports/udp/TransportInstance.go
+++ b/plc4go/spi/transports/udp/TransportInstance.go
@@ -109,9 +109,7 @@ func (m *TransportInstance) ConnectWithContext(ctx
context.Context) error {
}
// TODO: Start a worker that uses m.udpConn.ReadFromUDP() to fill a
buffer
- /*m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ /* m.wg.Go(func() {
buf := make([]byte, 1024)
for {
rsize, raddr, err := m.udpConn.ReadFromUDP(buf)
diff --git a/plc4go/spi/transports/utils/DefaultBufferedTransportInstance.go
b/plc4go/spi/transports/utils/DefaultBufferedTransportInstance.go
index 5b1381fb25..cb8eaac832 100644
--- a/plc4go/spi/transports/utils/DefaultBufferedTransportInstance.go
+++ b/plc4go/spi/transports/utils/DefaultBufferedTransportInstance.go
@@ -64,9 +64,7 @@ type defaultBufferedTransportInstance struct {
// ConnectWithContext is a compatibility implementation for those transports
not implementing this function
func (m *defaultBufferedTransportInstance) ConnectWithContext(ctx
context.Context) error {
ch := make(chan error, 1)
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
+ m.wg.Go(func() {
defer func() {
if err := recover(); err != nil {
m.log.Error().
@@ -77,7 +75,7 @@ func (m *defaultBufferedTransportInstance)
ConnectWithContext(ctx context.Contex
}()
ch <- m.Connect()
close(ch)
- }()
+ })
select {
case err := <-ch:
return err
diff --git a/plc4go/spi/utils/StopWarn.go b/plc4go/spi/utils/StopWarn.go
index 246ae1fac7..cfdeea6581 100644
--- a/plc4go/spi/utils/StopWarn.go
+++ b/plc4go/spi/utils/StopWarn.go
@@ -58,9 +58,7 @@ func StopWarn(localLog zerolog.Logger, opts
...func(*stopWarnOptions)) func() {
ticker := time.NewTicker(o.interval)
wg := new(sync.WaitGroup)
done := make(chan struct{})
- wg.Add(1)
- go func() {
- defer wg.Done()
+ wg.Go(func() {
localLog.Trace().Msgf("start checking")
startTime := time.Now()
for {
@@ -94,7 +92,7 @@ func StopWarn(localLog zerolog.Logger, opts
...func(*stopWarnOptions)) func() {
Msgf("%sstill in progress", processId)
}
}
- }()
+ })
start := time.Now()
return func() {
localLog.Trace().TimeDiff("check duration", time.Now(),
start).Msg("done")