This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 57a36f8e1b1f8d1c9198051ddca983f233292f2c
Author: Sebastian Rühl <[email protected]>
AuthorDate: Mon Nov 24 18:37:49 2025 +0100

    feat(plc4go/spi): reset connection on connection cache handout.
---
 plc4go/internal/ads/Connection.go                  |  4 ++
 plc4go/internal/ads/MessageCodec.go                |  4 ++
 .../bacnetip/ApplicationLayerMessageCodec.go       |  4 ++
 plc4go/internal/bacnetip/Connection.go             |  5 +++
 plc4go/internal/bacnetip/MessageCodec.go           |  4 ++
 plc4go/internal/cbus/Connection.go                 |  5 +++
 plc4go/internal/cbus/MessageCodec.go               |  4 ++
 plc4go/internal/cbus/mocks_test.go                 | 20 ++++++----
 plc4go/internal/eip/Connection.go                  |  5 +++
 plc4go/internal/eip/MessageCodec.go                |  5 +++
 plc4go/internal/knxnetip/Connection.go             |  4 ++
 plc4go/internal/knxnetip/MessageCodec.go           |  5 +++
 plc4go/internal/modbus/Connection.go               |  4 ++
 plc4go/internal/modbus/MessageCodec.go             |  5 +++
 plc4go/internal/opcua/Connection.go                |  5 +++
 plc4go/internal/opcua/MessageCodec.go              |  4 ++
 plc4go/internal/s7/Connection.go                   |  5 +++
 plc4go/internal/s7/MessageCodec.go                 |  4 ++
 plc4go/pkg/api/cache/PlcConnectionCache.go         |  5 +++
 plc4go/pkg/api/cache/connectionContainer.go        |  6 +--
 plc4go/pkg/api/logging/ZerologInterfaceMarshal.go  |  3 +-
 plc4go/spi/default/mocks_test.go                   | 33 ++++++++++++++++
 plc4go/spi/testutils/TestUtils.go                  |  2 +-
 plc4go/spi/testutils/mocks_test.go                 | 33 ++++++++++++++++
 plc4go/spi/transactions/mocks_test.go              | 45 ++++++++++++++--------
 plc4go/spi/transports/TransportInstance.go         |  2 +
 plc4go/spi/transports/mocks_test.go                | 33 ++++++++++++++++
 plc4go/spi/transports/pcap/TransportInstance.go    |  4 ++
 plc4go/spi/transports/serial/TransportInstance.go  |  4 ++
 plc4go/spi/transports/tcp/TransportInstance.go     | 11 ++++++
 plc4go/spi/transports/test/TransportInstance.go    |  4 ++
 plc4go/spi/transports/udp/TransportInstance.go     | 12 ++++++
 32 files changed, 265 insertions(+), 28 deletions(-)

diff --git a/plc4go/internal/ads/Connection.go 
b/plc4go/internal/ads/Connection.go
index 5eed55d1fc..ef8a9884c2 100644
--- a/plc4go/internal/ads/Connection.go
+++ b/plc4go/internal/ads/Connection.go
@@ -64,6 +64,10 @@ type Connection struct {
        _options       []options.WithOption // Used to pass them downstream
 }
 
+var (
+       _ spi.TransportInstanceExposer = (*Connection)(nil)
+)
+
 func NewConnection(messageCodec spi.MessageCodec, configuration 
model.Configuration, connectionOptions map[string][]string, _options 
...options.WithOption) (*Connection, error) {
        driverContext, err := NewDriverContext(configuration)
        if err != nil {
diff --git a/plc4go/internal/ads/MessageCodec.go 
b/plc4go/internal/ads/MessageCodec.go
index 14fe68c2e7..f34ad29f62 100644
--- a/plc4go/internal/ads/MessageCodec.go
+++ b/plc4go/internal/ads/MessageCodec.go
@@ -42,6 +42,10 @@ type MessageCodec struct {
        log zerolog.Logger
 }
 
+var (
+       _ spi.TransportInstanceExposer = (*MessageCodec)(nil)
+)
+
 func NewMessageCodec(transportInstance transports.TransportInstance, _options 
...options.WithOption) *MessageCodec {
        customLogger := 
options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
        codec := &MessageCodec{
diff --git a/plc4go/internal/bacnetip/ApplicationLayerMessageCodec.go 
b/plc4go/internal/bacnetip/ApplicationLayerMessageCodec.go
index 2685e3a202..81ce640b3d 100644
--- a/plc4go/internal/bacnetip/ApplicationLayerMessageCodec.go
+++ b/plc4go/internal/bacnetip/ApplicationLayerMessageCodec.go
@@ -56,6 +56,10 @@ type ApplicationLayerMessageCodec struct {
        log zerolog.Logger
 }
 
+var (
+       _ spi.TransportInstanceExposer = (*MessageCodec)(nil)
+)
+
 func NewApplicationLayerMessageCodec(localLog zerolog.Logger, udpTransport 
*udp.Transport, transportUrl url.URL, options map[string][]string, localAddress 
*net.UDPAddr, remoteAddress *net.UDPAddr) (*ApplicationLayerMessageCodec, 
error) {
        // TODO: currently this is done by the BIP down below
        // Have the transport create a new transport-instance.
diff --git a/plc4go/internal/bacnetip/Connection.go 
b/plc4go/internal/bacnetip/Connection.go
index 4084c214a9..1953341647 100644
--- a/plc4go/internal/bacnetip/Connection.go
+++ b/plc4go/internal/bacnetip/Connection.go
@@ -40,6 +40,7 @@ import (
 
 type Connection struct {
        _default.DefaultConnection
+
        invokeIdGenerator InvokeIdGenerator
        messageCodec      spi.MessageCodec
        subscribers       []*Subscriber
@@ -54,6 +55,10 @@ type Connection struct {
        _options []options.WithOption // Used to pass them downstream
 }
 
+var (
+       _ spi.TransportInstanceExposer = (*Connection)(nil)
+)
+
 func NewConnection(messageCodec spi.MessageCodec, tagHandler 
spi.PlcTagHandler, tm transactions.RequestTransactionManager, connectionOptions 
map[string][]string, _options ...options.WithOption) *Connection {
        customLogger := 
options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
        connection := &Connection{
diff --git a/plc4go/internal/bacnetip/MessageCodec.go 
b/plc4go/internal/bacnetip/MessageCodec.go
index 09faf9c1af..f7a849d54f 100644
--- a/plc4go/internal/bacnetip/MessageCodec.go
+++ b/plc4go/internal/bacnetip/MessageCodec.go
@@ -40,6 +40,10 @@ type MessageCodec struct {
        log            zerolog.Logger
 }
 
+var (
+       _ spi.TransportInstanceExposer = (*MessageCodec)(nil)
+)
+
 func NewMessageCodec(transportInstance transports.TransportInstance, _options 
...options.WithOption) *MessageCodec {
        codec := &MessageCodec{}
        codec.DefaultCodec = _default.NewDefaultCodec(codec, transportInstance, 
append(_options, 
_default.WithCustomMessageHandler(codec.handleCustomMessage))...)
diff --git a/plc4go/internal/cbus/Connection.go 
b/plc4go/internal/cbus/Connection.go
index fd8c631629..c231c89067 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -60,6 +60,7 @@ func (t *AlphaGenerator) getAndIncrement() byte {
 //go:generate go tool plc4xGenerator -type=Connection
 type Connection struct {
        _default.DefaultConnection
+
        alphaGenerator AlphaGenerator `stringer:"true"`
        messageCodec   *MessageCodec
        subscribers    []*Subscriber
@@ -79,6 +80,10 @@ type Connection struct {
        _options []options.WithOption `ignore:"true"` // Used to pass them 
downstream
 }
 
+var (
+       _ spi.TransportInstanceExposer = (*Connection)(nil)
+)
+
 func NewConnection(messageCodec *MessageCodec, configuration Configuration, 
driverContext DriverContext, tagHandler spi.PlcTagHandler, tm 
transactions.RequestTransactionManager, connectionOptions map[string][]string, 
_options ...options.WithOption) *Connection {
        customLogger := 
options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
        connection := &Connection{
diff --git a/plc4go/internal/cbus/MessageCodec.go 
b/plc4go/internal/cbus/MessageCodec.go
index 68210bd707..0a83edecd0 100644
--- a/plc4go/internal/cbus/MessageCodec.go
+++ b/plc4go/internal/cbus/MessageCodec.go
@@ -56,6 +56,10 @@ type MessageCodec struct {
        log            zerolog.Logger
 }
 
+var (
+       _ spi.TransportInstanceExposer = (*MessageCodec)(nil)
+)
+
 func NewMessageCodec(transportInstance transports.TransportInstance, _options 
...options.WithOption) *MessageCodec {
        passLoggerToModel, _ := options.ExtractPassLoggerToModel(_options...)
        customLogger := 
options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
diff --git a/plc4go/internal/cbus/mocks_test.go 
b/plc4go/internal/cbus/mocks_test.go
index 47067a7820..8068378a28 100644
--- a/plc4go/internal/cbus/mocks_test.go
+++ b/plc4go/internal/cbus/mocks_test.go
@@ -4237,7 +4237,7 @@ func (_c *MockRequestTransaction_String_Call) 
RunAndReturn(run func() string) *M
 
 // Submit provides a mock function for the type MockRequestTransaction
 func (_mock *MockRequestTransaction) Submit(operationInfo string, operation 
transactions.RequestTransactionRunnable) {
-       _mock.Called(operation)
+       _mock.Called(operationInfo, operation)
        return
 }
 
@@ -4247,19 +4247,25 @@ type MockRequestTransaction_Submit_Call struct {
 }
 
 // Submit is a helper method to define mock.On call
+//   - operationInfo string
 //   - operation transactions.RequestTransactionRunnable
-func (_e *MockRequestTransaction_Expecter) Submit(operation interface{}) 
*MockRequestTransaction_Submit_Call {
-       return &MockRequestTransaction_Submit_Call{Call: _e.mock.On("Submit", 
operation)}
+func (_e *MockRequestTransaction_Expecter) Submit(operationInfo interface{}, 
operation interface{}) *MockRequestTransaction_Submit_Call {
+       return &MockRequestTransaction_Submit_Call{Call: _e.mock.On("Submit", 
operationInfo, operation)}
 }
 
-func (_c *MockRequestTransaction_Submit_Call) Run(run func(operation 
transactions.RequestTransactionRunnable)) *MockRequestTransaction_Submit_Call {
+func (_c *MockRequestTransaction_Submit_Call) Run(run func(operationInfo 
string, operation transactions.RequestTransactionRunnable)) 
*MockRequestTransaction_Submit_Call {
        _c.Call.Run(func(args mock.Arguments) {
-               var arg0 transactions.RequestTransactionRunnable
+               var arg0 string
                if args[0] != nil {
-                       arg0 = args[0].(transactions.RequestTransactionRunnable)
+                       arg0 = args[0].(string)
+               }
+               var arg1 transactions.RequestTransactionRunnable
+               if args[1] != nil {
+                       arg1 = args[1].(transactions.RequestTransactionRunnable)
                }
                run(
                        arg0,
+                       arg1,
                )
        })
        return _c
@@ -4270,7 +4276,7 @@ func (_c *MockRequestTransaction_Submit_Call) Return() 
*MockRequestTransaction_S
        return _c
 }
 
-func (_c *MockRequestTransaction_Submit_Call) RunAndReturn(run func(operation 
transactions.RequestTransactionRunnable)) *MockRequestTransaction_Submit_Call {
+func (_c *MockRequestTransaction_Submit_Call) RunAndReturn(run 
func(operationInfo string, operation transactions.RequestTransactionRunnable)) 
*MockRequestTransaction_Submit_Call {
        _c.Run(run)
        return _c
 }
diff --git a/plc4go/internal/eip/Connection.go 
b/plc4go/internal/eip/Connection.go
index f960aef0ac..d4ef2c9253 100644
--- a/plc4go/internal/eip/Connection.go
+++ b/plc4go/internal/eip/Connection.go
@@ -48,6 +48,7 @@ const (
 
 type Connection struct {
        _default.DefaultConnection
+
        messageCodec              spi.MessageCodec
        configuration             Configuration
        driverContext             DriverContext
@@ -69,6 +70,10 @@ type Connection struct {
        _options []options.WithOption // Used to pass them downstream
 }
 
+var (
+       _ spi.TransportInstanceExposer = (*Connection)(nil)
+)
+
 func NewConnection(
        messageCodec spi.MessageCodec,
        configuration Configuration,
diff --git a/plc4go/internal/eip/MessageCodec.go 
b/plc4go/internal/eip/MessageCodec.go
index 68cfaec457..856338c8be 100644
--- a/plc4go/internal/eip/MessageCodec.go
+++ b/plc4go/internal/eip/MessageCodec.go
@@ -37,11 +37,16 @@ import (
 //go:generate go tool plc4xGenerator -type=MessageCodec
 type MessageCodec struct {
        _default.DefaultCodec
+
        none bool // TODO: just a empty field to satisfy generator (needs 
fixing because in this case here we have the delegate)
 
        log zerolog.Logger
 }
 
+var (
+       _ spi.TransportInstanceExposer = (*MessageCodec)(nil)
+)
+
 func NewMessageCodec(transportInstance transports.TransportInstance, _options 
...options.WithOption) *MessageCodec {
        customLogger := 
options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
        codec := &MessageCodec{
diff --git a/plc4go/internal/knxnetip/Connection.go 
b/plc4go/internal/knxnetip/Connection.go
index 61630f9fc2..363c21e71f 100644
--- a/plc4go/internal/knxnetip/Connection.go
+++ b/plc4go/internal/knxnetip/Connection.go
@@ -143,6 +143,10 @@ type Connection struct {
        _options       []options.WithOption // Used to pass them downstream
 }
 
+var (
+       _ spi.TransportInstanceExposer = (*Connection)(nil)
+)
+
 func (m *Connection) String() string {
        return fmt.Sprintf("knx.Connection{}")
 }
diff --git a/plc4go/internal/knxnetip/MessageCodec.go 
b/plc4go/internal/knxnetip/MessageCodec.go
index f5aa902781..56eec608d3 100644
--- a/plc4go/internal/knxnetip/MessageCodec.go
+++ b/plc4go/internal/knxnetip/MessageCodec.go
@@ -35,6 +35,7 @@ import (
 //go:generate go tool plc4xGenerator -type=MessageCodec
 type MessageCodec struct {
        _default.DefaultCodec
+
        sequenceCounter    int32
        messageInterceptor func(message spi.Message)
 
@@ -42,6 +43,10 @@ type MessageCodec struct {
        log            zerolog.Logger
 }
 
+var (
+       _ spi.TransportInstanceExposer = (*MessageCodec)(nil)
+)
+
 func NewMessageCodec(transportInstance transports.TransportInstance, 
messageInterceptor func(message spi.Message), _options ...options.WithOption) 
*MessageCodec {
        passLoggerToModel, _ := options.ExtractPassLoggerToModel(_options...)
        customLogger := 
options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
diff --git a/plc4go/internal/modbus/Connection.go 
b/plc4go/internal/modbus/Connection.go
index 26003642dd..b2a3eb0d86 100644
--- a/plc4go/internal/modbus/Connection.go
+++ b/plc4go/internal/modbus/Connection.go
@@ -55,6 +55,10 @@ type Connection struct {
        _options []options.WithOption // Used to pass them downstream
 }
 
+var (
+       _ spi.TransportInstanceExposer = (*Connection)(nil)
+)
+
 func NewConnection(unitIdentifier uint8, messageCodec spi.MessageCodec, 
connectionOptions map[string][]string, tagHandler spi.PlcTagHandler, _options 
...options.WithOption) *Connection {
        customLogger := 
options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
        connection := &Connection{
diff --git a/plc4go/internal/modbus/MessageCodec.go 
b/plc4go/internal/modbus/MessageCodec.go
index 0e88064ebf..b6e61e0399 100644
--- a/plc4go/internal/modbus/MessageCodec.go
+++ b/plc4go/internal/modbus/MessageCodec.go
@@ -36,6 +36,7 @@ import (
 //go:generate go tool plc4xGenerator -type=MessageCodec
 type MessageCodec struct {
        _default.DefaultCodec
+
        expectationCounter int32
 
        passLogToModel bool
@@ -43,6 +44,10 @@ type MessageCodec struct {
        log zerolog.Logger
 }
 
+var (
+       _ spi.TransportInstanceExposer = (*MessageCodec)(nil)
+)
+
 func NewMessageCodec(transportInstance transports.TransportInstance, _options 
...options.WithOption) *MessageCodec {
        passLoggerToModel, _ := options.ExtractPassLoggerToModel(_options...)
        customLogger := 
options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
diff --git a/plc4go/internal/opcua/Connection.go 
b/plc4go/internal/opcua/Connection.go
index 7012b667bc..352c7994a3 100644
--- a/plc4go/internal/opcua/Connection.go
+++ b/plc4go/internal/opcua/Connection.go
@@ -39,6 +39,7 @@ import (
 //go:generate go tool plc4xGenerator -type=Connection
 type Connection struct {
        _default.DefaultConnection
+
        messageCodec *MessageCodec
        subscribers  []*Subscriber
 
@@ -61,6 +62,10 @@ type Connection struct {
        _options []options.WithOption `ignore:"true"` // Used to pass them 
downstream
 }
 
+var (
+       _ spi.TransportInstanceExposer = (*Connection)(nil)
+)
+
 func NewConnection(messageCodec *MessageCodec, configuration Configuration, 
driverContext DriverContext, tagHandler spi.PlcTagHandler, connectionOptions 
map[string][]string, _options ...options.WithOption) *Connection {
        customLogger := 
options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
        connection := &Connection{
diff --git a/plc4go/internal/opcua/MessageCodec.go 
b/plc4go/internal/opcua/MessageCodec.go
index a2f307a526..8488fcb8e0 100644
--- a/plc4go/internal/opcua/MessageCodec.go
+++ b/plc4go/internal/opcua/MessageCodec.go
@@ -45,6 +45,10 @@ type MessageCodec struct {
        log            zerolog.Logger
 }
 
+var (
+       _ spi.TransportInstanceExposer = (*MessageCodec)(nil)
+)
+
 func NewMessageCodec(transportInstance transports.TransportInstance, _options 
...options.WithOption) *MessageCodec {
        passLoggerToModel, _ := options.ExtractPassLoggerToModel(_options...)
        customLogger := 
options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
diff --git a/plc4go/internal/s7/Connection.go b/plc4go/internal/s7/Connection.go
index 69e4a9d638..7bc42c0909 100644
--- a/plc4go/internal/s7/Connection.go
+++ b/plc4go/internal/s7/Connection.go
@@ -59,6 +59,7 @@ func (t *TpduGenerator) getAndIncrement() uint16 {
 
 type Connection struct {
        _default.DefaultConnection
+
        tpduGenerator TpduGenerator
        messageCodec  spi.MessageCodec
        configuration Configuration
@@ -74,6 +75,10 @@ type Connection struct {
        _options []options.WithOption // Used to pass them downstream
 }
 
+var (
+       _ spi.TransportInstanceExposer = (*Connection)(nil)
+)
+
 func NewConnection(messageCodec spi.MessageCodec, configuration Configuration, 
driverContext DriverContext, tagHandler spi.PlcTagHandler, tm 
transactions.RequestTransactionManager, connectionOptions map[string][]string, 
_options ...options.WithOption) *Connection {
        customLogger := 
options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
        connection := &Connection{
diff --git a/plc4go/internal/s7/MessageCodec.go 
b/plc4go/internal/s7/MessageCodec.go
index 924cf45908..7a7d68f16e 100644
--- a/plc4go/internal/s7/MessageCodec.go
+++ b/plc4go/internal/s7/MessageCodec.go
@@ -40,6 +40,10 @@ type MessageCodec struct {
        log            zerolog.Logger
 }
 
+var (
+       _ spi.TransportInstanceExposer = (*MessageCodec)(nil)
+)
+
 func NewMessageCodec(transportInstance transports.TransportInstance, _options 
...options.WithOption) *MessageCodec {
        passLoggerToModel, _ := options.ExtractPassLoggerToModel(_options...)
        extractCustomLogger := 
options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
diff --git a/plc4go/pkg/api/cache/PlcConnectionCache.go 
b/plc4go/pkg/api/cache/PlcConnectionCache.go
index 1022e6e2ab..4ff7a58442 100644
--- a/plc4go/pkg/api/cache/PlcConnectionCache.go
+++ b/plc4go/pkg/api/cache/PlcConnectionCache.go
@@ -32,6 +32,7 @@ import (
 
        "github.com/apache/plc4x/plc4go/pkg/api"
        "github.com/apache/plc4x/plc4go/pkg/api/config"
+       "github.com/apache/plc4x/plc4go/spi"
        "github.com/apache/plc4x/plc4go/spi/options"
        "github.com/apache/plc4x/plc4go/spi/tracer"
 )
@@ -192,6 +193,10 @@ func (c *plcConnectionCache) GetConnection(ctx 
context.Context, connectionString
                if c.tracer != nil {
                        c.tracer.AddTransactionalTrace(txId, "get-connection", 
"success")
                }
+               if tie, ok := conn.connection.(spi.TransportInstanceExposer); 
ok {
+                       c.log.Trace().Msg("Resetting transport instance")
+                       tie.GetTransportInstance().Reset()
+               }
                return conn, nil
 
        case err := <-errChan:
diff --git a/plc4go/pkg/api/cache/connectionContainer.go 
b/plc4go/pkg/api/cache/connectionContainer.go
index 1f7f7dba31..20776b5742 100644
--- a/plc4go/pkg/api/cache/connectionContainer.go
+++ b/plc4go/pkg/api/cache/connectionContainer.go
@@ -50,7 +50,7 @@ type connectionContainer struct {
 
 type connectionRequest struct {
        ctx      context.Context
-       connChan chan plc4go.PlcConnection
+       connChan chan *plcConnectionLease
        errChan  chan error
 }
 
@@ -153,11 +153,11 @@ func (c *connectionContainer) addListener(listener 
connectionListener) {
        c.listeners = append(c.listeners, listener)
 }
 
-func (c *connectionContainer) lease(ctx context.Context) (chan 
plc4go.PlcConnection, chan error) {
+func (c *connectionContainer) lease(ctx context.Context) (chan 
*plcConnectionLease, chan error) {
        c.lock.Lock()
        defer c.lock.Unlock()
 
-       connectionChan := make(chan plc4go.PlcConnection, 1)
+       connectionChan := make(chan *plcConnectionLease, 1)
        errorChan := make(chan error, 1)
        // Check if the connection is available.
        switch c.state {
diff --git a/plc4go/pkg/api/logging/ZerologInterfaceMarshal.go 
b/plc4go/pkg/api/logging/ZerologInterfaceMarshal.go
index 923719d902..d18bef5cb8 100644
--- a/plc4go/pkg/api/logging/ZerologInterfaceMarshal.go
+++ b/plc4go/pkg/api/logging/ZerologInterfaceMarshal.go
@@ -24,9 +24,10 @@ import (
        "context"
        "encoding/json"
 
-       "github.com/apache/plc4x/plc4go/spi/utils"
        "github.com/pkg/errors"
        "github.com/rs/zerolog"
+
+       "github.com/apache/plc4x/plc4go/spi/utils"
 )
 
 // PLCMessageFormat defines the format of the PLCMessage that is logged
diff --git a/plc4go/spi/default/mocks_test.go b/plc4go/spi/default/mocks_test.go
index 96cbeece1b..9b37620105 100644
--- a/plc4go/spi/default/mocks_test.go
+++ b/plc4go/spi/default/mocks_test.go
@@ -4707,6 +4707,39 @@ func (_c *MockTransportInstance_Read_Call) 
RunAndReturn(run func(ctx context.Con
        return _c
 }
 
+// Reset provides a mock function for the type MockTransportInstance
+func (_mock *MockTransportInstance) Reset() {
+       _mock.Called()
+       return
+}
+
+// MockTransportInstance_Reset_Call is a *mock.Call that shadows Run/Return 
methods with type explicit version for method 'Reset'
+type MockTransportInstance_Reset_Call struct {
+       *mock.Call
+}
+
+// Reset is a helper method to define mock.On call
+func (_e *MockTransportInstance_Expecter) Reset() 
*MockTransportInstance_Reset_Call {
+       return &MockTransportInstance_Reset_Call{Call: _e.mock.On("Reset")}
+}
+
+func (_c *MockTransportInstance_Reset_Call) Run(run func()) 
*MockTransportInstance_Reset_Call {
+       _c.Call.Run(func(args mock.Arguments) {
+               run()
+       })
+       return _c
+}
+
+func (_c *MockTransportInstance_Reset_Call) Return() 
*MockTransportInstance_Reset_Call {
+       _c.Call.Return()
+       return _c
+}
+
+func (_c *MockTransportInstance_Reset_Call) RunAndReturn(run func()) 
*MockTransportInstance_Reset_Call {
+       _c.Run(run)
+       return _c
+}
+
 // String provides a mock function for the type MockTransportInstance
 func (_mock *MockTransportInstance) String() string {
        ret := _mock.Called()
diff --git a/plc4go/spi/testutils/TestUtils.go 
b/plc4go/spi/testutils/TestUtils.go
index d2ebb9498c..9a4b370b30 100644
--- a/plc4go/spi/testutils/TestUtils.go
+++ b/plc4go/spi/testutils/TestUtils.go
@@ -34,7 +34,6 @@ import (
 
        "github.com/ajankovic/xdiff"
        "github.com/ajankovic/xdiff/parser"
-       "github.com/apache/plc4x/plc4go/pkg/api/logging"
        "github.com/fatih/color"
        "github.com/pkg/errors"
        "github.com/rs/zerolog"
@@ -42,6 +41,7 @@ import (
        "github.com/rs/zerolog/pkgerrors"
        "github.com/stretchr/testify/assert"
 
+       "github.com/apache/plc4x/plc4go/pkg/api/logging"
        "github.com/apache/plc4x/plc4go/spi/options"
        "github.com/apache/plc4x/plc4go/spi/pool"
        "github.com/apache/plc4x/plc4go/spi/transactions"
diff --git a/plc4go/spi/testutils/mocks_test.go 
b/plc4go/spi/testutils/mocks_test.go
index d3054b67ae..1f5f9fd557 100644
--- a/plc4go/spi/testutils/mocks_test.go
+++ b/plc4go/spi/testutils/mocks_test.go
@@ -690,6 +690,39 @@ func (_c *MockTestTransportInstance_Read_Call) 
RunAndReturn(run func(ctx context
        return _c
 }
 
+// Reset provides a mock function for the type MockTestTransportInstance
+func (_mock *MockTestTransportInstance) Reset() {
+       _mock.Called()
+       return
+}
+
+// MockTestTransportInstance_Reset_Call is a *mock.Call that shadows 
Run/Return methods with type explicit version for method 'Reset'
+type MockTestTransportInstance_Reset_Call struct {
+       *mock.Call
+}
+
+// Reset is a helper method to define mock.On call
+func (_e *MockTestTransportInstance_Expecter) Reset() 
*MockTestTransportInstance_Reset_Call {
+       return &MockTestTransportInstance_Reset_Call{Call: _e.mock.On("Reset")}
+}
+
+func (_c *MockTestTransportInstance_Reset_Call) Run(run func()) 
*MockTestTransportInstance_Reset_Call {
+       _c.Call.Run(func(args mock.Arguments) {
+               run()
+       })
+       return _c
+}
+
+func (_c *MockTestTransportInstance_Reset_Call) Return() 
*MockTestTransportInstance_Reset_Call {
+       _c.Call.Return()
+       return _c
+}
+
+func (_c *MockTestTransportInstance_Reset_Call) RunAndReturn(run func()) 
*MockTestTransportInstance_Reset_Call {
+       _c.Run(run)
+       return _c
+}
+
 // String provides a mock function for the type MockTestTransportInstance
 func (_mock *MockTestTransportInstance) String() string {
        ret := _mock.Called()
diff --git a/plc4go/spi/transactions/mocks_test.go 
b/plc4go/spi/transactions/mocks_test.go
index c223abb423..687dd15ad2 100644
--- a/plc4go/spi/transactions/mocks_test.go
+++ b/plc4go/spi/transactions/mocks_test.go
@@ -294,7 +294,7 @@ func (_c *MockRequestTransaction_String_Call) 
RunAndReturn(run func() string) *M
 
 // Submit provides a mock function for the type MockRequestTransaction
 func (_mock *MockRequestTransaction) Submit(operationInfo string, operation 
RequestTransactionRunnable) {
-       _mock.Called(operation)
+       _mock.Called(operationInfo, operation)
        return
 }
 
@@ -304,19 +304,25 @@ type MockRequestTransaction_Submit_Call struct {
 }
 
 // Submit is a helper method to define mock.On call
+//   - operationInfo string
 //   - operation RequestTransactionRunnable
-func (_e *MockRequestTransaction_Expecter) Submit(operation interface{}) 
*MockRequestTransaction_Submit_Call {
-       return &MockRequestTransaction_Submit_Call{Call: _e.mock.On("Submit", 
operation)}
+func (_e *MockRequestTransaction_Expecter) Submit(operationInfo interface{}, 
operation interface{}) *MockRequestTransaction_Submit_Call {
+       return &MockRequestTransaction_Submit_Call{Call: _e.mock.On("Submit", 
operationInfo, operation)}
 }
 
-func (_c *MockRequestTransaction_Submit_Call) Run(run func(operation 
RequestTransactionRunnable)) *MockRequestTransaction_Submit_Call {
+func (_c *MockRequestTransaction_Submit_Call) Run(run func(operationInfo 
string, operation RequestTransactionRunnable)) 
*MockRequestTransaction_Submit_Call {
        _c.Call.Run(func(args mock.Arguments) {
-               var arg0 RequestTransactionRunnable
+               var arg0 string
                if args[0] != nil {
-                       arg0 = args[0].(RequestTransactionRunnable)
+                       arg0 = args[0].(string)
+               }
+               var arg1 RequestTransactionRunnable
+               if args[1] != nil {
+                       arg1 = args[1].(RequestTransactionRunnable)
                }
                run(
                        arg0,
+                       arg1,
                )
        })
        return _c
@@ -327,7 +333,7 @@ func (_c *MockRequestTransaction_Submit_Call) Return() 
*MockRequestTransaction_S
        return _c
 }
 
-func (_c *MockRequestTransaction_Submit_Call) RunAndReturn(run func(operation 
RequestTransactionRunnable)) *MockRequestTransaction_Submit_Call {
+func (_c *MockRequestTransaction_Submit_Call) RunAndReturn(run 
func(operationInfo string, operation RequestTransactionRunnable)) 
*MockRequestTransaction_Submit_Call {
        _c.Run(run)
        return _c
 }
@@ -495,16 +501,16 @@ func (_c 
*MockRequestTransactionManager_SetNumberOfConcurrentRequests_Call) RunA
 }
 
 // StartTransaction provides a mock function for the type 
MockRequestTransactionManager
-func (_mock *MockRequestTransactionManager) StartTransaction(string) 
RequestTransaction {
-       ret := _mock.Called()
+func (_mock *MockRequestTransactionManager) StartTransaction(transactionInfo 
string) RequestTransaction {
+       ret := _mock.Called(transactionInfo)
 
        if len(ret) == 0 {
                panic("no return value specified for StartTransaction")
        }
 
        var r0 RequestTransaction
-       if returnFunc, ok := ret.Get(0).(func() RequestTransaction); ok {
-               r0 = returnFunc()
+       if returnFunc, ok := ret.Get(0).(func(string) RequestTransaction); ok {
+               r0 = returnFunc(transactionInfo)
        } else {
                if ret.Get(0) != nil {
                        r0 = ret.Get(0).(RequestTransaction)
@@ -519,13 +525,20 @@ type MockRequestTransactionManager_StartTransaction_Call 
struct {
 }
 
 // StartTransaction is a helper method to define mock.On call
-func (_e *MockRequestTransactionManager_Expecter) StartTransaction() 
*MockRequestTransactionManager_StartTransaction_Call {
-       return &MockRequestTransactionManager_StartTransaction_Call{Call: 
_e.mock.On("StartTransaction")}
+//   - transactionInfo string
+func (_e *MockRequestTransactionManager_Expecter) 
StartTransaction(transactionInfo interface{}) 
*MockRequestTransactionManager_StartTransaction_Call {
+       return &MockRequestTransactionManager_StartTransaction_Call{Call: 
_e.mock.On("StartTransaction", transactionInfo)}
 }
 
-func (_c *MockRequestTransactionManager_StartTransaction_Call) Run(run func()) 
*MockRequestTransactionManager_StartTransaction_Call {
+func (_c *MockRequestTransactionManager_StartTransaction_Call) Run(run 
func(transactionInfo string)) 
*MockRequestTransactionManager_StartTransaction_Call {
        _c.Call.Run(func(args mock.Arguments) {
-               run()
+               var arg0 string
+               if args[0] != nil {
+                       arg0 = args[0].(string)
+               }
+               run(
+                       arg0,
+               )
        })
        return _c
 }
@@ -535,7 +548,7 @@ func (_c 
*MockRequestTransactionManager_StartTransaction_Call) Return(requestTra
        return _c
 }
 
-func (_c *MockRequestTransactionManager_StartTransaction_Call) 
RunAndReturn(run func() RequestTransaction) 
*MockRequestTransactionManager_StartTransaction_Call {
+func (_c *MockRequestTransactionManager_StartTransaction_Call) 
RunAndReturn(run func(transactionInfo string) RequestTransaction) 
*MockRequestTransactionManager_StartTransaction_Call {
        _c.Call.Return(run)
        return _c
 }
diff --git a/plc4go/spi/transports/TransportInstance.go 
b/plc4go/spi/transports/TransportInstance.go
index b49f0a8904..291523f53a 100644
--- a/plc4go/spi/transports/TransportInstance.go
+++ b/plc4go/spi/transports/TransportInstance.go
@@ -45,4 +45,6 @@ type TransportInstance interface {
        Read(ctx context.Context, numBytes uint32) ([]byte, error)
        // Write writes data to the transport
        Write(ctx context.Context, data []byte) error
+       // Reset resets the transport instance
+       Reset()
 }
diff --git a/plc4go/spi/transports/mocks_test.go 
b/plc4go/spi/transports/mocks_test.go
index b21396a3d3..53a6cde7cd 100644
--- a/plc4go/spi/transports/mocks_test.go
+++ b/plc4go/spi/transports/mocks_test.go
@@ -931,6 +931,39 @@ func (_c *MockTransportInstance_Read_Call) 
RunAndReturn(run func(ctx context.Con
        return _c
 }
 
+// Reset provides a mock function for the type MockTransportInstance
+func (_mock *MockTransportInstance) Reset() {
+       _mock.Called()
+       return
+}
+
+// MockTransportInstance_Reset_Call is a *mock.Call that shadows Run/Return 
methods with type explicit version for method 'Reset'
+type MockTransportInstance_Reset_Call struct {
+       *mock.Call
+}
+
+// Reset is a helper method to define mock.On call
+func (_e *MockTransportInstance_Expecter) Reset() 
*MockTransportInstance_Reset_Call {
+       return &MockTransportInstance_Reset_Call{Call: _e.mock.On("Reset")}
+}
+
+func (_c *MockTransportInstance_Reset_Call) Run(run func()) 
*MockTransportInstance_Reset_Call {
+       _c.Call.Run(func(args mock.Arguments) {
+               run()
+       })
+       return _c
+}
+
+func (_c *MockTransportInstance_Reset_Call) Return() 
*MockTransportInstance_Reset_Call {
+       _c.Call.Return()
+       return _c
+}
+
+func (_c *MockTransportInstance_Reset_Call) RunAndReturn(run func()) 
*MockTransportInstance_Reset_Call {
+       _c.Run(run)
+       return _c
+}
+
 // String provides a mock function for the type MockTransportInstance
 func (_mock *MockTransportInstance) String() string {
        ret := _mock.Called()
diff --git a/plc4go/spi/transports/pcap/TransportInstance.go 
b/plc4go/spi/transports/pcap/TransportInstance.go
index 40a2218fa3..5043d20ee8 100644
--- a/plc4go/spi/transports/pcap/TransportInstance.go
+++ b/plc4go/spi/transports/pcap/TransportInstance.go
@@ -169,6 +169,10 @@ func (m *TransportInstance) Connect(ctx context.Context) 
error {
        return nil
 }
 
+func (m *TransportInstance) Reset() {
+       // No-Op
+}
+
 func (m *TransportInstance) Close() error {
        defer utils.StopWarn(m.log)()
        m.stateChangeMutex.Lock()
diff --git a/plc4go/spi/transports/serial/TransportInstance.go 
b/plc4go/spi/transports/serial/TransportInstance.go
index cfcdeaf118..9edad672f6 100644
--- a/plc4go/spi/transports/serial/TransportInstance.go
+++ b/plc4go/spi/transports/serial/TransportInstance.go
@@ -98,6 +98,10 @@ func (m *TransportInstance) Connect(ctx context.Context) 
error {
        return nil
 }
 
+func (m *TransportInstance) Reset() {
+       // No-Op
+}
+
 func (m *TransportInstance) Close() error {
        defer utils.StopWarn(m.log)()
        m.stateChangeMutex.Lock()
diff --git a/plc4go/spi/transports/tcp/TransportInstance.go 
b/plc4go/spi/transports/tcp/TransportInstance.go
index 5e442bc781..1ef966cbe2 100644
--- a/plc4go/spi/transports/tcp/TransportInstance.go
+++ b/plc4go/spi/transports/tcp/TransportInstance.go
@@ -94,6 +94,17 @@ func (m *TransportInstance) Connect(ctx context.Context) 
error {
        return nil
 }
 
+func (m *TransportInstance) Reset() {
+       if m.tcpConn == nil {
+               m.log.Trace().Msg("No connection to reset")
+               return
+       }
+       _ = m.tcpConn.SetReadDeadline(time.Now().Add(1))
+       _, _ = m.tcpConn.Read(make([]byte, 4096))
+       m.reader = bufio.NewReader(m.tcpConn)
+       m.log.Trace().Msg("Connection reset")
+}
+
 func (m *TransportInstance) Close() error {
        defer utils.StopWarn(m.log)()
        m.stateChangeMutex.Lock()
diff --git a/plc4go/spi/transports/test/TransportInstance.go 
b/plc4go/spi/transports/test/TransportInstance.go
index 70a39b504f..9bbdbceb5e 100644
--- a/plc4go/spi/transports/test/TransportInstance.go
+++ b/plc4go/spi/transports/test/TransportInstance.go
@@ -318,6 +318,10 @@ func (m *TransportInstance) DrainWriteBuffer(numBytes 
uint32) []byte {
        return data
 }
 
+func (m *TransportInstance) Reset() {
+       // No-Op
+}
+
 func (m *TransportInstance) String() string {
        return "test"
 }
diff --git a/plc4go/spi/transports/udp/TransportInstance.go 
b/plc4go/spi/transports/udp/TransportInstance.go
index 33ce22b4fb..a708f52cb7 100644
--- a/plc4go/spi/transports/udp/TransportInstance.go
+++ b/plc4go/spi/transports/udp/TransportInstance.go
@@ -26,6 +26,7 @@ import (
        "net"
        "sync"
        "sync/atomic"
+       "time"
 
        "github.com/libp2p/go-reuseport"
        "github.com/pkg/errors"
@@ -121,6 +122,17 @@ func (m *TransportInstance) Connect(ctx context.Context) 
error {
        return nil
 }
 
+func (m *TransportInstance) Reset() {
+       if m.udpConn == nil {
+               m.log.Trace().Msg("No connection to reset")
+               return
+       }
+       _ = m.udpConn.SetReadDeadline(time.Now().Add(1))
+       _, _, _ = m.udpConn.ReadFromUDP(make([]byte, 4096))
+       m.reader = bufio.NewReader(m.udpConn)
+       m.log.Trace().Msg("Connection reset")
+}
+
 func (m *TransportInstance) Close() error {
        defer utils.StopWarn(m.log)()
        m.stateChangeMutex.Lock()

Reply via email to