This is an automated email from the ASF dual-hosted git repository. sruehl pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 57a36f8e1b1f8d1c9198051ddca983f233292f2c Author: Sebastian Rühl <[email protected]> AuthorDate: Mon Nov 24 18:37:49 2025 +0100 feat(plc4go/spi): reset connection on connection cache handout. --- plc4go/internal/ads/Connection.go | 4 ++ plc4go/internal/ads/MessageCodec.go | 4 ++ .../bacnetip/ApplicationLayerMessageCodec.go | 4 ++ plc4go/internal/bacnetip/Connection.go | 5 +++ plc4go/internal/bacnetip/MessageCodec.go | 4 ++ plc4go/internal/cbus/Connection.go | 5 +++ plc4go/internal/cbus/MessageCodec.go | 4 ++ plc4go/internal/cbus/mocks_test.go | 20 ++++++---- plc4go/internal/eip/Connection.go | 5 +++ plc4go/internal/eip/MessageCodec.go | 5 +++ plc4go/internal/knxnetip/Connection.go | 4 ++ plc4go/internal/knxnetip/MessageCodec.go | 5 +++ plc4go/internal/modbus/Connection.go | 4 ++ plc4go/internal/modbus/MessageCodec.go | 5 +++ plc4go/internal/opcua/Connection.go | 5 +++ plc4go/internal/opcua/MessageCodec.go | 4 ++ plc4go/internal/s7/Connection.go | 5 +++ plc4go/internal/s7/MessageCodec.go | 4 ++ plc4go/pkg/api/cache/PlcConnectionCache.go | 5 +++ plc4go/pkg/api/cache/connectionContainer.go | 6 +-- plc4go/pkg/api/logging/ZerologInterfaceMarshal.go | 3 +- plc4go/spi/default/mocks_test.go | 33 ++++++++++++++++ plc4go/spi/testutils/TestUtils.go | 2 +- plc4go/spi/testutils/mocks_test.go | 33 ++++++++++++++++ plc4go/spi/transactions/mocks_test.go | 45 ++++++++++++++-------- plc4go/spi/transports/TransportInstance.go | 2 + plc4go/spi/transports/mocks_test.go | 33 ++++++++++++++++ plc4go/spi/transports/pcap/TransportInstance.go | 4 ++ plc4go/spi/transports/serial/TransportInstance.go | 4 ++ plc4go/spi/transports/tcp/TransportInstance.go | 11 ++++++ plc4go/spi/transports/test/TransportInstance.go | 4 ++ plc4go/spi/transports/udp/TransportInstance.go | 12 ++++++ 32 files changed, 265 insertions(+), 28 deletions(-) diff --git a/plc4go/internal/ads/Connection.go b/plc4go/internal/ads/Connection.go index 5eed55d1fc..ef8a9884c2 100644 --- a/plc4go/internal/ads/Connection.go +++ b/plc4go/internal/ads/Connection.go @@ -64,6 +64,10 @@ type Connection struct { _options []options.WithOption // Used to pass them downstream } +var ( + _ spi.TransportInstanceExposer = (*Connection)(nil) +) + func NewConnection(messageCodec spi.MessageCodec, configuration model.Configuration, connectionOptions map[string][]string, _options ...options.WithOption) (*Connection, error) { driverContext, err := NewDriverContext(configuration) if err != nil { diff --git a/plc4go/internal/ads/MessageCodec.go b/plc4go/internal/ads/MessageCodec.go index 14fe68c2e7..f34ad29f62 100644 --- a/plc4go/internal/ads/MessageCodec.go +++ b/plc4go/internal/ads/MessageCodec.go @@ -42,6 +42,10 @@ type MessageCodec struct { log zerolog.Logger } +var ( + _ spi.TransportInstanceExposer = (*MessageCodec)(nil) +) + func NewMessageCodec(transportInstance transports.TransportInstance, _options ...options.WithOption) *MessageCodec { customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...) codec := &MessageCodec{ diff --git a/plc4go/internal/bacnetip/ApplicationLayerMessageCodec.go b/plc4go/internal/bacnetip/ApplicationLayerMessageCodec.go index 2685e3a202..81ce640b3d 100644 --- a/plc4go/internal/bacnetip/ApplicationLayerMessageCodec.go +++ b/plc4go/internal/bacnetip/ApplicationLayerMessageCodec.go @@ -56,6 +56,10 @@ type ApplicationLayerMessageCodec struct { log zerolog.Logger } +var ( + _ spi.TransportInstanceExposer = (*MessageCodec)(nil) +) + func NewApplicationLayerMessageCodec(localLog zerolog.Logger, udpTransport *udp.Transport, transportUrl url.URL, options map[string][]string, localAddress *net.UDPAddr, remoteAddress *net.UDPAddr) (*ApplicationLayerMessageCodec, error) { // TODO: currently this is done by the BIP down below // Have the transport create a new transport-instance. diff --git a/plc4go/internal/bacnetip/Connection.go b/plc4go/internal/bacnetip/Connection.go index 4084c214a9..1953341647 100644 --- a/plc4go/internal/bacnetip/Connection.go +++ b/plc4go/internal/bacnetip/Connection.go @@ -40,6 +40,7 @@ import ( type Connection struct { _default.DefaultConnection + invokeIdGenerator InvokeIdGenerator messageCodec spi.MessageCodec subscribers []*Subscriber @@ -54,6 +55,10 @@ type Connection struct { _options []options.WithOption // Used to pass them downstream } +var ( + _ spi.TransportInstanceExposer = (*Connection)(nil) +) + func NewConnection(messageCodec spi.MessageCodec, tagHandler spi.PlcTagHandler, tm transactions.RequestTransactionManager, connectionOptions map[string][]string, _options ...options.WithOption) *Connection { customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...) connection := &Connection{ diff --git a/plc4go/internal/bacnetip/MessageCodec.go b/plc4go/internal/bacnetip/MessageCodec.go index 09faf9c1af..f7a849d54f 100644 --- a/plc4go/internal/bacnetip/MessageCodec.go +++ b/plc4go/internal/bacnetip/MessageCodec.go @@ -40,6 +40,10 @@ type MessageCodec struct { log zerolog.Logger } +var ( + _ spi.TransportInstanceExposer = (*MessageCodec)(nil) +) + func NewMessageCodec(transportInstance transports.TransportInstance, _options ...options.WithOption) *MessageCodec { codec := &MessageCodec{} codec.DefaultCodec = _default.NewDefaultCodec(codec, transportInstance, append(_options, _default.WithCustomMessageHandler(codec.handleCustomMessage))...) diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go index fd8c631629..c231c89067 100644 --- a/plc4go/internal/cbus/Connection.go +++ b/plc4go/internal/cbus/Connection.go @@ -60,6 +60,7 @@ func (t *AlphaGenerator) getAndIncrement() byte { //go:generate go tool plc4xGenerator -type=Connection type Connection struct { _default.DefaultConnection + alphaGenerator AlphaGenerator `stringer:"true"` messageCodec *MessageCodec subscribers []*Subscriber @@ -79,6 +80,10 @@ type Connection struct { _options []options.WithOption `ignore:"true"` // Used to pass them downstream } +var ( + _ spi.TransportInstanceExposer = (*Connection)(nil) +) + func NewConnection(messageCodec *MessageCodec, configuration Configuration, driverContext DriverContext, tagHandler spi.PlcTagHandler, tm transactions.RequestTransactionManager, connectionOptions map[string][]string, _options ...options.WithOption) *Connection { customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...) connection := &Connection{ diff --git a/plc4go/internal/cbus/MessageCodec.go b/plc4go/internal/cbus/MessageCodec.go index 68210bd707..0a83edecd0 100644 --- a/plc4go/internal/cbus/MessageCodec.go +++ b/plc4go/internal/cbus/MessageCodec.go @@ -56,6 +56,10 @@ type MessageCodec struct { log zerolog.Logger } +var ( + _ spi.TransportInstanceExposer = (*MessageCodec)(nil) +) + func NewMessageCodec(transportInstance transports.TransportInstance, _options ...options.WithOption) *MessageCodec { passLoggerToModel, _ := options.ExtractPassLoggerToModel(_options...) customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...) diff --git a/plc4go/internal/cbus/mocks_test.go b/plc4go/internal/cbus/mocks_test.go index 47067a7820..8068378a28 100644 --- a/plc4go/internal/cbus/mocks_test.go +++ b/plc4go/internal/cbus/mocks_test.go @@ -4237,7 +4237,7 @@ func (_c *MockRequestTransaction_String_Call) RunAndReturn(run func() string) *M // Submit provides a mock function for the type MockRequestTransaction func (_mock *MockRequestTransaction) Submit(operationInfo string, operation transactions.RequestTransactionRunnable) { - _mock.Called(operation) + _mock.Called(operationInfo, operation) return } @@ -4247,19 +4247,25 @@ type MockRequestTransaction_Submit_Call struct { } // Submit is a helper method to define mock.On call +// - operationInfo string // - operation transactions.RequestTransactionRunnable -func (_e *MockRequestTransaction_Expecter) Submit(operation interface{}) *MockRequestTransaction_Submit_Call { - return &MockRequestTransaction_Submit_Call{Call: _e.mock.On("Submit", operation)} +func (_e *MockRequestTransaction_Expecter) Submit(operationInfo interface{}, operation interface{}) *MockRequestTransaction_Submit_Call { + return &MockRequestTransaction_Submit_Call{Call: _e.mock.On("Submit", operationInfo, operation)} } -func (_c *MockRequestTransaction_Submit_Call) Run(run func(operation transactions.RequestTransactionRunnable)) *MockRequestTransaction_Submit_Call { +func (_c *MockRequestTransaction_Submit_Call) Run(run func(operationInfo string, operation transactions.RequestTransactionRunnable)) *MockRequestTransaction_Submit_Call { _c.Call.Run(func(args mock.Arguments) { - var arg0 transactions.RequestTransactionRunnable + var arg0 string if args[0] != nil { - arg0 = args[0].(transactions.RequestTransactionRunnable) + arg0 = args[0].(string) + } + var arg1 transactions.RequestTransactionRunnable + if args[1] != nil { + arg1 = args[1].(transactions.RequestTransactionRunnable) } run( arg0, + arg1, ) }) return _c @@ -4270,7 +4276,7 @@ func (_c *MockRequestTransaction_Submit_Call) Return() *MockRequestTransaction_S return _c } -func (_c *MockRequestTransaction_Submit_Call) RunAndReturn(run func(operation transactions.RequestTransactionRunnable)) *MockRequestTransaction_Submit_Call { +func (_c *MockRequestTransaction_Submit_Call) RunAndReturn(run func(operationInfo string, operation transactions.RequestTransactionRunnable)) *MockRequestTransaction_Submit_Call { _c.Run(run) return _c } diff --git a/plc4go/internal/eip/Connection.go b/plc4go/internal/eip/Connection.go index f960aef0ac..d4ef2c9253 100644 --- a/plc4go/internal/eip/Connection.go +++ b/plc4go/internal/eip/Connection.go @@ -48,6 +48,7 @@ const ( type Connection struct { _default.DefaultConnection + messageCodec spi.MessageCodec configuration Configuration driverContext DriverContext @@ -69,6 +70,10 @@ type Connection struct { _options []options.WithOption // Used to pass them downstream } +var ( + _ spi.TransportInstanceExposer = (*Connection)(nil) +) + func NewConnection( messageCodec spi.MessageCodec, configuration Configuration, diff --git a/plc4go/internal/eip/MessageCodec.go b/plc4go/internal/eip/MessageCodec.go index 68cfaec457..856338c8be 100644 --- a/plc4go/internal/eip/MessageCodec.go +++ b/plc4go/internal/eip/MessageCodec.go @@ -37,11 +37,16 @@ import ( //go:generate go tool plc4xGenerator -type=MessageCodec type MessageCodec struct { _default.DefaultCodec + none bool // TODO: just a empty field to satisfy generator (needs fixing because in this case here we have the delegate) log zerolog.Logger } +var ( + _ spi.TransportInstanceExposer = (*MessageCodec)(nil) +) + func NewMessageCodec(transportInstance transports.TransportInstance, _options ...options.WithOption) *MessageCodec { customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...) codec := &MessageCodec{ diff --git a/plc4go/internal/knxnetip/Connection.go b/plc4go/internal/knxnetip/Connection.go index 61630f9fc2..363c21e71f 100644 --- a/plc4go/internal/knxnetip/Connection.go +++ b/plc4go/internal/knxnetip/Connection.go @@ -143,6 +143,10 @@ type Connection struct { _options []options.WithOption // Used to pass them downstream } +var ( + _ spi.TransportInstanceExposer = (*Connection)(nil) +) + func (m *Connection) String() string { return fmt.Sprintf("knx.Connection{}") } diff --git a/plc4go/internal/knxnetip/MessageCodec.go b/plc4go/internal/knxnetip/MessageCodec.go index f5aa902781..56eec608d3 100644 --- a/plc4go/internal/knxnetip/MessageCodec.go +++ b/plc4go/internal/knxnetip/MessageCodec.go @@ -35,6 +35,7 @@ import ( //go:generate go tool plc4xGenerator -type=MessageCodec type MessageCodec struct { _default.DefaultCodec + sequenceCounter int32 messageInterceptor func(message spi.Message) @@ -42,6 +43,10 @@ type MessageCodec struct { log zerolog.Logger } +var ( + _ spi.TransportInstanceExposer = (*MessageCodec)(nil) +) + func NewMessageCodec(transportInstance transports.TransportInstance, messageInterceptor func(message spi.Message), _options ...options.WithOption) *MessageCodec { passLoggerToModel, _ := options.ExtractPassLoggerToModel(_options...) customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...) diff --git a/plc4go/internal/modbus/Connection.go b/plc4go/internal/modbus/Connection.go index 26003642dd..b2a3eb0d86 100644 --- a/plc4go/internal/modbus/Connection.go +++ b/plc4go/internal/modbus/Connection.go @@ -55,6 +55,10 @@ type Connection struct { _options []options.WithOption // Used to pass them downstream } +var ( + _ spi.TransportInstanceExposer = (*Connection)(nil) +) + func NewConnection(unitIdentifier uint8, messageCodec spi.MessageCodec, connectionOptions map[string][]string, tagHandler spi.PlcTagHandler, _options ...options.WithOption) *Connection { customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...) connection := &Connection{ diff --git a/plc4go/internal/modbus/MessageCodec.go b/plc4go/internal/modbus/MessageCodec.go index 0e88064ebf..b6e61e0399 100644 --- a/plc4go/internal/modbus/MessageCodec.go +++ b/plc4go/internal/modbus/MessageCodec.go @@ -36,6 +36,7 @@ import ( //go:generate go tool plc4xGenerator -type=MessageCodec type MessageCodec struct { _default.DefaultCodec + expectationCounter int32 passLogToModel bool @@ -43,6 +44,10 @@ type MessageCodec struct { log zerolog.Logger } +var ( + _ spi.TransportInstanceExposer = (*MessageCodec)(nil) +) + func NewMessageCodec(transportInstance transports.TransportInstance, _options ...options.WithOption) *MessageCodec { passLoggerToModel, _ := options.ExtractPassLoggerToModel(_options...) customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...) diff --git a/plc4go/internal/opcua/Connection.go b/plc4go/internal/opcua/Connection.go index 7012b667bc..352c7994a3 100644 --- a/plc4go/internal/opcua/Connection.go +++ b/plc4go/internal/opcua/Connection.go @@ -39,6 +39,7 @@ import ( //go:generate go tool plc4xGenerator -type=Connection type Connection struct { _default.DefaultConnection + messageCodec *MessageCodec subscribers []*Subscriber @@ -61,6 +62,10 @@ type Connection struct { _options []options.WithOption `ignore:"true"` // Used to pass them downstream } +var ( + _ spi.TransportInstanceExposer = (*Connection)(nil) +) + func NewConnection(messageCodec *MessageCodec, configuration Configuration, driverContext DriverContext, tagHandler spi.PlcTagHandler, connectionOptions map[string][]string, _options ...options.WithOption) *Connection { customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...) connection := &Connection{ diff --git a/plc4go/internal/opcua/MessageCodec.go b/plc4go/internal/opcua/MessageCodec.go index a2f307a526..8488fcb8e0 100644 --- a/plc4go/internal/opcua/MessageCodec.go +++ b/plc4go/internal/opcua/MessageCodec.go @@ -45,6 +45,10 @@ type MessageCodec struct { log zerolog.Logger } +var ( + _ spi.TransportInstanceExposer = (*MessageCodec)(nil) +) + func NewMessageCodec(transportInstance transports.TransportInstance, _options ...options.WithOption) *MessageCodec { passLoggerToModel, _ := options.ExtractPassLoggerToModel(_options...) customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...) diff --git a/plc4go/internal/s7/Connection.go b/plc4go/internal/s7/Connection.go index 69e4a9d638..7bc42c0909 100644 --- a/plc4go/internal/s7/Connection.go +++ b/plc4go/internal/s7/Connection.go @@ -59,6 +59,7 @@ func (t *TpduGenerator) getAndIncrement() uint16 { type Connection struct { _default.DefaultConnection + tpduGenerator TpduGenerator messageCodec spi.MessageCodec configuration Configuration @@ -74,6 +75,10 @@ type Connection struct { _options []options.WithOption // Used to pass them downstream } +var ( + _ spi.TransportInstanceExposer = (*Connection)(nil) +) + func NewConnection(messageCodec spi.MessageCodec, configuration Configuration, driverContext DriverContext, tagHandler spi.PlcTagHandler, tm transactions.RequestTransactionManager, connectionOptions map[string][]string, _options ...options.WithOption) *Connection { customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...) connection := &Connection{ diff --git a/plc4go/internal/s7/MessageCodec.go b/plc4go/internal/s7/MessageCodec.go index 924cf45908..7a7d68f16e 100644 --- a/plc4go/internal/s7/MessageCodec.go +++ b/plc4go/internal/s7/MessageCodec.go @@ -40,6 +40,10 @@ type MessageCodec struct { log zerolog.Logger } +var ( + _ spi.TransportInstanceExposer = (*MessageCodec)(nil) +) + func NewMessageCodec(transportInstance transports.TransportInstance, _options ...options.WithOption) *MessageCodec { passLoggerToModel, _ := options.ExtractPassLoggerToModel(_options...) extractCustomLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...) diff --git a/plc4go/pkg/api/cache/PlcConnectionCache.go b/plc4go/pkg/api/cache/PlcConnectionCache.go index 1022e6e2ab..4ff7a58442 100644 --- a/plc4go/pkg/api/cache/PlcConnectionCache.go +++ b/plc4go/pkg/api/cache/PlcConnectionCache.go @@ -32,6 +32,7 @@ import ( "github.com/apache/plc4x/plc4go/pkg/api" "github.com/apache/plc4x/plc4go/pkg/api/config" + "github.com/apache/plc4x/plc4go/spi" "github.com/apache/plc4x/plc4go/spi/options" "github.com/apache/plc4x/plc4go/spi/tracer" ) @@ -192,6 +193,10 @@ func (c *plcConnectionCache) GetConnection(ctx context.Context, connectionString if c.tracer != nil { c.tracer.AddTransactionalTrace(txId, "get-connection", "success") } + if tie, ok := conn.connection.(spi.TransportInstanceExposer); ok { + c.log.Trace().Msg("Resetting transport instance") + tie.GetTransportInstance().Reset() + } return conn, nil case err := <-errChan: diff --git a/plc4go/pkg/api/cache/connectionContainer.go b/plc4go/pkg/api/cache/connectionContainer.go index 1f7f7dba31..20776b5742 100644 --- a/plc4go/pkg/api/cache/connectionContainer.go +++ b/plc4go/pkg/api/cache/connectionContainer.go @@ -50,7 +50,7 @@ type connectionContainer struct { type connectionRequest struct { ctx context.Context - connChan chan plc4go.PlcConnection + connChan chan *plcConnectionLease errChan chan error } @@ -153,11 +153,11 @@ func (c *connectionContainer) addListener(listener connectionListener) { c.listeners = append(c.listeners, listener) } -func (c *connectionContainer) lease(ctx context.Context) (chan plc4go.PlcConnection, chan error) { +func (c *connectionContainer) lease(ctx context.Context) (chan *plcConnectionLease, chan error) { c.lock.Lock() defer c.lock.Unlock() - connectionChan := make(chan plc4go.PlcConnection, 1) + connectionChan := make(chan *plcConnectionLease, 1) errorChan := make(chan error, 1) // Check if the connection is available. switch c.state { diff --git a/plc4go/pkg/api/logging/ZerologInterfaceMarshal.go b/plc4go/pkg/api/logging/ZerologInterfaceMarshal.go index 923719d902..d18bef5cb8 100644 --- a/plc4go/pkg/api/logging/ZerologInterfaceMarshal.go +++ b/plc4go/pkg/api/logging/ZerologInterfaceMarshal.go @@ -24,9 +24,10 @@ import ( "context" "encoding/json" - "github.com/apache/plc4x/plc4go/spi/utils" "github.com/pkg/errors" "github.com/rs/zerolog" + + "github.com/apache/plc4x/plc4go/spi/utils" ) // PLCMessageFormat defines the format of the PLCMessage that is logged diff --git a/plc4go/spi/default/mocks_test.go b/plc4go/spi/default/mocks_test.go index 96cbeece1b..9b37620105 100644 --- a/plc4go/spi/default/mocks_test.go +++ b/plc4go/spi/default/mocks_test.go @@ -4707,6 +4707,39 @@ func (_c *MockTransportInstance_Read_Call) RunAndReturn(run func(ctx context.Con return _c } +// Reset provides a mock function for the type MockTransportInstance +func (_mock *MockTransportInstance) Reset() { + _mock.Called() + return +} + +// MockTransportInstance_Reset_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Reset' +type MockTransportInstance_Reset_Call struct { + *mock.Call +} + +// Reset is a helper method to define mock.On call +func (_e *MockTransportInstance_Expecter) Reset() *MockTransportInstance_Reset_Call { + return &MockTransportInstance_Reset_Call{Call: _e.mock.On("Reset")} +} + +func (_c *MockTransportInstance_Reset_Call) Run(run func()) *MockTransportInstance_Reset_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockTransportInstance_Reset_Call) Return() *MockTransportInstance_Reset_Call { + _c.Call.Return() + return _c +} + +func (_c *MockTransportInstance_Reset_Call) RunAndReturn(run func()) *MockTransportInstance_Reset_Call { + _c.Run(run) + return _c +} + // String provides a mock function for the type MockTransportInstance func (_mock *MockTransportInstance) String() string { ret := _mock.Called() diff --git a/plc4go/spi/testutils/TestUtils.go b/plc4go/spi/testutils/TestUtils.go index d2ebb9498c..9a4b370b30 100644 --- a/plc4go/spi/testutils/TestUtils.go +++ b/plc4go/spi/testutils/TestUtils.go @@ -34,7 +34,6 @@ import ( "github.com/ajankovic/xdiff" "github.com/ajankovic/xdiff/parser" - "github.com/apache/plc4x/plc4go/pkg/api/logging" "github.com/fatih/color" "github.com/pkg/errors" "github.com/rs/zerolog" @@ -42,6 +41,7 @@ import ( "github.com/rs/zerolog/pkgerrors" "github.com/stretchr/testify/assert" + "github.com/apache/plc4x/plc4go/pkg/api/logging" "github.com/apache/plc4x/plc4go/spi/options" "github.com/apache/plc4x/plc4go/spi/pool" "github.com/apache/plc4x/plc4go/spi/transactions" diff --git a/plc4go/spi/testutils/mocks_test.go b/plc4go/spi/testutils/mocks_test.go index d3054b67ae..1f5f9fd557 100644 --- a/plc4go/spi/testutils/mocks_test.go +++ b/plc4go/spi/testutils/mocks_test.go @@ -690,6 +690,39 @@ func (_c *MockTestTransportInstance_Read_Call) RunAndReturn(run func(ctx context return _c } +// Reset provides a mock function for the type MockTestTransportInstance +func (_mock *MockTestTransportInstance) Reset() { + _mock.Called() + return +} + +// MockTestTransportInstance_Reset_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Reset' +type MockTestTransportInstance_Reset_Call struct { + *mock.Call +} + +// Reset is a helper method to define mock.On call +func (_e *MockTestTransportInstance_Expecter) Reset() *MockTestTransportInstance_Reset_Call { + return &MockTestTransportInstance_Reset_Call{Call: _e.mock.On("Reset")} +} + +func (_c *MockTestTransportInstance_Reset_Call) Run(run func()) *MockTestTransportInstance_Reset_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockTestTransportInstance_Reset_Call) Return() *MockTestTransportInstance_Reset_Call { + _c.Call.Return() + return _c +} + +func (_c *MockTestTransportInstance_Reset_Call) RunAndReturn(run func()) *MockTestTransportInstance_Reset_Call { + _c.Run(run) + return _c +} + // String provides a mock function for the type MockTestTransportInstance func (_mock *MockTestTransportInstance) String() string { ret := _mock.Called() diff --git a/plc4go/spi/transactions/mocks_test.go b/plc4go/spi/transactions/mocks_test.go index c223abb423..687dd15ad2 100644 --- a/plc4go/spi/transactions/mocks_test.go +++ b/plc4go/spi/transactions/mocks_test.go @@ -294,7 +294,7 @@ func (_c *MockRequestTransaction_String_Call) RunAndReturn(run func() string) *M // Submit provides a mock function for the type MockRequestTransaction func (_mock *MockRequestTransaction) Submit(operationInfo string, operation RequestTransactionRunnable) { - _mock.Called(operation) + _mock.Called(operationInfo, operation) return } @@ -304,19 +304,25 @@ type MockRequestTransaction_Submit_Call struct { } // Submit is a helper method to define mock.On call +// - operationInfo string // - operation RequestTransactionRunnable -func (_e *MockRequestTransaction_Expecter) Submit(operation interface{}) *MockRequestTransaction_Submit_Call { - return &MockRequestTransaction_Submit_Call{Call: _e.mock.On("Submit", operation)} +func (_e *MockRequestTransaction_Expecter) Submit(operationInfo interface{}, operation interface{}) *MockRequestTransaction_Submit_Call { + return &MockRequestTransaction_Submit_Call{Call: _e.mock.On("Submit", operationInfo, operation)} } -func (_c *MockRequestTransaction_Submit_Call) Run(run func(operation RequestTransactionRunnable)) *MockRequestTransaction_Submit_Call { +func (_c *MockRequestTransaction_Submit_Call) Run(run func(operationInfo string, operation RequestTransactionRunnable)) *MockRequestTransaction_Submit_Call { _c.Call.Run(func(args mock.Arguments) { - var arg0 RequestTransactionRunnable + var arg0 string if args[0] != nil { - arg0 = args[0].(RequestTransactionRunnable) + arg0 = args[0].(string) + } + var arg1 RequestTransactionRunnable + if args[1] != nil { + arg1 = args[1].(RequestTransactionRunnable) } run( arg0, + arg1, ) }) return _c @@ -327,7 +333,7 @@ func (_c *MockRequestTransaction_Submit_Call) Return() *MockRequestTransaction_S return _c } -func (_c *MockRequestTransaction_Submit_Call) RunAndReturn(run func(operation RequestTransactionRunnable)) *MockRequestTransaction_Submit_Call { +func (_c *MockRequestTransaction_Submit_Call) RunAndReturn(run func(operationInfo string, operation RequestTransactionRunnable)) *MockRequestTransaction_Submit_Call { _c.Run(run) return _c } @@ -495,16 +501,16 @@ func (_c *MockRequestTransactionManager_SetNumberOfConcurrentRequests_Call) RunA } // StartTransaction provides a mock function for the type MockRequestTransactionManager -func (_mock *MockRequestTransactionManager) StartTransaction(string) RequestTransaction { - ret := _mock.Called() +func (_mock *MockRequestTransactionManager) StartTransaction(transactionInfo string) RequestTransaction { + ret := _mock.Called(transactionInfo) if len(ret) == 0 { panic("no return value specified for StartTransaction") } var r0 RequestTransaction - if returnFunc, ok := ret.Get(0).(func() RequestTransaction); ok { - r0 = returnFunc() + if returnFunc, ok := ret.Get(0).(func(string) RequestTransaction); ok { + r0 = returnFunc(transactionInfo) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(RequestTransaction) @@ -519,13 +525,20 @@ type MockRequestTransactionManager_StartTransaction_Call struct { } // StartTransaction is a helper method to define mock.On call -func (_e *MockRequestTransactionManager_Expecter) StartTransaction() *MockRequestTransactionManager_StartTransaction_Call { - return &MockRequestTransactionManager_StartTransaction_Call{Call: _e.mock.On("StartTransaction")} +// - transactionInfo string +func (_e *MockRequestTransactionManager_Expecter) StartTransaction(transactionInfo interface{}) *MockRequestTransactionManager_StartTransaction_Call { + return &MockRequestTransactionManager_StartTransaction_Call{Call: _e.mock.On("StartTransaction", transactionInfo)} } -func (_c *MockRequestTransactionManager_StartTransaction_Call) Run(run func()) *MockRequestTransactionManager_StartTransaction_Call { +func (_c *MockRequestTransactionManager_StartTransaction_Call) Run(run func(transactionInfo string)) *MockRequestTransactionManager_StartTransaction_Call { _c.Call.Run(func(args mock.Arguments) { - run() + var arg0 string + if args[0] != nil { + arg0 = args[0].(string) + } + run( + arg0, + ) }) return _c } @@ -535,7 +548,7 @@ func (_c *MockRequestTransactionManager_StartTransaction_Call) Return(requestTra return _c } -func (_c *MockRequestTransactionManager_StartTransaction_Call) RunAndReturn(run func() RequestTransaction) *MockRequestTransactionManager_StartTransaction_Call { +func (_c *MockRequestTransactionManager_StartTransaction_Call) RunAndReturn(run func(transactionInfo string) RequestTransaction) *MockRequestTransactionManager_StartTransaction_Call { _c.Call.Return(run) return _c } diff --git a/plc4go/spi/transports/TransportInstance.go b/plc4go/spi/transports/TransportInstance.go index b49f0a8904..291523f53a 100644 --- a/plc4go/spi/transports/TransportInstance.go +++ b/plc4go/spi/transports/TransportInstance.go @@ -45,4 +45,6 @@ type TransportInstance interface { Read(ctx context.Context, numBytes uint32) ([]byte, error) // Write writes data to the transport Write(ctx context.Context, data []byte) error + // Reset resets the transport instance + Reset() } diff --git a/plc4go/spi/transports/mocks_test.go b/plc4go/spi/transports/mocks_test.go index b21396a3d3..53a6cde7cd 100644 --- a/plc4go/spi/transports/mocks_test.go +++ b/plc4go/spi/transports/mocks_test.go @@ -931,6 +931,39 @@ func (_c *MockTransportInstance_Read_Call) RunAndReturn(run func(ctx context.Con return _c } +// Reset provides a mock function for the type MockTransportInstance +func (_mock *MockTransportInstance) Reset() { + _mock.Called() + return +} + +// MockTransportInstance_Reset_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Reset' +type MockTransportInstance_Reset_Call struct { + *mock.Call +} + +// Reset is a helper method to define mock.On call +func (_e *MockTransportInstance_Expecter) Reset() *MockTransportInstance_Reset_Call { + return &MockTransportInstance_Reset_Call{Call: _e.mock.On("Reset")} +} + +func (_c *MockTransportInstance_Reset_Call) Run(run func()) *MockTransportInstance_Reset_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockTransportInstance_Reset_Call) Return() *MockTransportInstance_Reset_Call { + _c.Call.Return() + return _c +} + +func (_c *MockTransportInstance_Reset_Call) RunAndReturn(run func()) *MockTransportInstance_Reset_Call { + _c.Run(run) + return _c +} + // String provides a mock function for the type MockTransportInstance func (_mock *MockTransportInstance) String() string { ret := _mock.Called() diff --git a/plc4go/spi/transports/pcap/TransportInstance.go b/plc4go/spi/transports/pcap/TransportInstance.go index 40a2218fa3..5043d20ee8 100644 --- a/plc4go/spi/transports/pcap/TransportInstance.go +++ b/plc4go/spi/transports/pcap/TransportInstance.go @@ -169,6 +169,10 @@ func (m *TransportInstance) Connect(ctx context.Context) error { return nil } +func (m *TransportInstance) Reset() { + // No-Op +} + func (m *TransportInstance) Close() error { defer utils.StopWarn(m.log)() m.stateChangeMutex.Lock() diff --git a/plc4go/spi/transports/serial/TransportInstance.go b/plc4go/spi/transports/serial/TransportInstance.go index cfcdeaf118..9edad672f6 100644 --- a/plc4go/spi/transports/serial/TransportInstance.go +++ b/plc4go/spi/transports/serial/TransportInstance.go @@ -98,6 +98,10 @@ func (m *TransportInstance) Connect(ctx context.Context) error { return nil } +func (m *TransportInstance) Reset() { + // No-Op +} + func (m *TransportInstance) Close() error { defer utils.StopWarn(m.log)() m.stateChangeMutex.Lock() diff --git a/plc4go/spi/transports/tcp/TransportInstance.go b/plc4go/spi/transports/tcp/TransportInstance.go index 5e442bc781..1ef966cbe2 100644 --- a/plc4go/spi/transports/tcp/TransportInstance.go +++ b/plc4go/spi/transports/tcp/TransportInstance.go @@ -94,6 +94,17 @@ func (m *TransportInstance) Connect(ctx context.Context) error { return nil } +func (m *TransportInstance) Reset() { + if m.tcpConn == nil { + m.log.Trace().Msg("No connection to reset") + return + } + _ = m.tcpConn.SetReadDeadline(time.Now().Add(1)) + _, _ = m.tcpConn.Read(make([]byte, 4096)) + m.reader = bufio.NewReader(m.tcpConn) + m.log.Trace().Msg("Connection reset") +} + func (m *TransportInstance) Close() error { defer utils.StopWarn(m.log)() m.stateChangeMutex.Lock() diff --git a/plc4go/spi/transports/test/TransportInstance.go b/plc4go/spi/transports/test/TransportInstance.go index 70a39b504f..9bbdbceb5e 100644 --- a/plc4go/spi/transports/test/TransportInstance.go +++ b/plc4go/spi/transports/test/TransportInstance.go @@ -318,6 +318,10 @@ func (m *TransportInstance) DrainWriteBuffer(numBytes uint32) []byte { return data } +func (m *TransportInstance) Reset() { + // No-Op +} + func (m *TransportInstance) String() string { return "test" } diff --git a/plc4go/spi/transports/udp/TransportInstance.go b/plc4go/spi/transports/udp/TransportInstance.go index 33ce22b4fb..a708f52cb7 100644 --- a/plc4go/spi/transports/udp/TransportInstance.go +++ b/plc4go/spi/transports/udp/TransportInstance.go @@ -26,6 +26,7 @@ import ( "net" "sync" "sync/atomic" + "time" "github.com/libp2p/go-reuseport" "github.com/pkg/errors" @@ -121,6 +122,17 @@ func (m *TransportInstance) Connect(ctx context.Context) error { return nil } +func (m *TransportInstance) Reset() { + if m.udpConn == nil { + m.log.Trace().Msg("No connection to reset") + return + } + _ = m.udpConn.SetReadDeadline(time.Now().Add(1)) + _, _, _ = m.udpConn.ReadFromUDP(make([]byte, 4096)) + m.reader = bufio.NewReader(m.udpConn) + m.log.Trace().Msg("Connection reset") +} + func (m *TransportInstance) Close() error { defer utils.StopWarn(m.log)() m.stateChangeMutex.Lock()
