This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 750dff6481157985abf314e7bf026b0aa9d38a22
Author: Sebastian Rühl <[email protected]>
AuthorDate: Thu Nov 13 10:36:28 2025 +0100

    test(plc4go): fixed issues with concurrency
---
 plc4go/internal/cbus/Browser_test.go               |  6 +-
 plc4go/internal/cbus/Connection.go                 | 22 ++++---
 plc4go/internal/cbus/Connection_test.go            | 34 ++++-------
 plc4go/internal/cbus/Driver_test.go                |  8 +--
 plc4go/internal/cbus/MessageCodec.go               |  5 +-
 plc4go/internal/cbus/MessageCodec_test.go          | 23 +++++--
 plc4go/internal/cbus/Reader_test.go                | 31 +++++-----
 plc4go/internal/cbus/Writer_test.go                |  6 +-
 plc4go/spi/default/DefaultCodec.go                 |  7 ++-
 plc4go/spi/default/DefaultCodec_test.go            |  6 +-
 .../SingleItemRequestInterceptor_test.go           | 12 ++--
 plc4go/spi/transports/test/TransportInstance.go    | 71 ++++++++++++----------
 .../spi/transports/test/TransportInstance_test.go  |  1 +
 plc4go/spi/transports/test/Transport_test.go       | 35 ++++++-----
 .../test/transportInstanceDrivenExtendedReader.go  |  4 --
 15 files changed, 147 insertions(+), 124 deletions(-)

diff --git a/plc4go/internal/cbus/Browser_test.go 
b/plc4go/internal/cbus/Browser_test.go
index 308fa89513..db63663c2d 100644
--- a/plc4go/internal/cbus/Browser_test.go
+++ b/plc4go/internal/cbus/Browser_test.go
@@ -326,7 +326,7 @@ func TestBrowser_browseUnitInfo(t *testing.T) {
 
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                        wantResponseCode: apiModel.PlcResponseCode_OK,
@@ -395,7 +395,7 @@ func TestBrowser_extractUnits(t *testing.T) {
                        setup: func(t *testing.T, fields *fields, args *args) {
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                        want:    
[]readWriteModel.UnitAddress{readWriteModel.NewUnitAddress(2)},
@@ -595,7 +595,7 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) 
{
 
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                        want: map[byte]any{
diff --git a/plc4go/internal/cbus/Connection.go 
b/plc4go/internal/cbus/Connection.go
index 5cddb6034a..203317d86b 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -234,10 +234,10 @@ func (c *Connection) setupConnection(ctx context.Context, 
ch chan plc4go.PlcConn
        cbusOptions := &c.messageCodec.cbusOptions
        requestContext := &c.messageCodec.requestContext
 
-       if !c.sendReset(ctx, ch, cbusOptions, requestContext, false) {
+       if !c.sendReset(ctx, ch, false) {
                c.log.Warn().Msg("First reset failed")
                // We try a second reset in case we get a power up
-               if !c.sendReset(ctx, ch, cbusOptions, requestContext, true) {
+               if !c.sendReset(ctx, ch, true) {
                        c.log.Trace().Msg("Reset failed")
                        return
                }
@@ -343,7 +343,7 @@ func (c *Connection) startSubscriptionHandler() {
        })
 }
 
-func (c *Connection) sendReset(ctx context.Context, ch chan 
plc4go.PlcConnectionConnectResult, cbusOptions *readWriteModel.CBusOptions, 
requestContext *readWriteModel.RequestContext, sendOutErrorNotification bool) 
(ok bool) {
+func (c *Connection) sendReset(ctx context.Context, ch chan 
plc4go.PlcConnectionConnectResult, sendOutErrorNotification bool) (ok bool) {
        c.log.Debug().Bool("sendOutErrorNotification", 
sendOutErrorNotification).Msg("Send a reset")
        requestTypeReset := readWriteModel.RequestType_RESET
        requestReset := readWriteModel.NewRequestReset(
@@ -442,7 +442,7 @@ func (c *Connection) sendReset(ctx context.Context, ch chan 
plc4go.PlcConnection
        }
 
        startTime := time.Now()
-       timeout := time.NewTimer(time.Millisecond * 500)
+       timer := time.NewTimer(ttl)
        select {
        case <-receivedResetEchoChan:
                c.log.Debug().Msg("We received the echo")
@@ -453,7 +453,7 @@ func (c *Connection) sendReset(ctx context.Context, ch chan 
plc4go.PlcConnection
                        c.log.Trace().Err(err).Msg("connect failed")
                }
                return false
-       case timeout := <-timeout.C:
+       case timeout := <-timer.C:
                if sendOutErrorNotification {
                        c.fireConnectionError(errors.Errorf("Timeout after %v", 
timeout.Sub(startTime)), ch)
                } else {
@@ -593,27 +593,33 @@ func (c *Connection) sendCalDataWrite(ctx 
context.Context, ch chan plc4go.PlcCon
                        return nil
                },
                func(err error) error {
+                       c.log.Trace().Err(err).Msg("got error processing 
request")
                        select {
                        case directCommandAckErrorChan <- errors.Wrap(err, "got 
error processing request"):
+                               c.log.Trace().Msg("error redirected")
                        default:
+                               c.log.Trace().Err(err).Msg("error discarded")
                        }
                        return nil
                },
                ttl,
        ); err != nil {
+               c.log.Trace().Err(err).Msg("got error sending request")
                c.fireConnectionError(errors.Wrap(err, "Error during sending of 
write request"), ch)
                return false
        }
 
        startTime := time.Now()
-       timeout := time.NewTimer(60 * time.Second)
+       timer := time.NewTimer(60 * time.Second)
        select {
        case <-directCommandAckChan:
-               c.log.Debug().Msg("We received the ack")
+               c.log.Trace().Msg("We received the ack")
        case err := <-directCommandAckErrorChan:
+               c.log.Trace().Err(err).Msg("got error processing request")
                c.fireConnectionError(errors.Wrap(err, "Error receiving of 
ack"), ch)
                return false
-       case timeout := <-timeout.C:
+       case timeout := <-timer.C:
+               c.log.Trace().Dur("timeout", 
timeout.Sub(startTime)).Msg("Timeout")
                c.fireConnectionError(errors.Errorf("Timeout after %v", 
timeout.Sub(startTime)), ch)
                return false
        }
diff --git a/plc4go/internal/cbus/Connection_test.go 
b/plc4go/internal/cbus/Connection_test.go
index b3c33f76e7..5ace85f392 100644
--- a/plc4go/internal/cbus/Connection_test.go
+++ b/plc4go/internal/cbus/Connection_test.go
@@ -143,7 +143,7 @@ func TestConnection_Connect(t *testing.T) {
                name         string
                fields       fields
                args         args
-               setup        func(t *testing.T, fields *fields)
+               setup        func(*testing.T, *fields, *args)
                wantAsserter func(*testing.T, <-chan 
plc4go.PlcConnectionConnectResult) bool
        }{
                {
@@ -166,10 +166,7 @@ func TestConnection_Connect(t *testing.T) {
                                connectionId: "connectionId13",
                                tracer:       nil,
                        },
-                       args: args{
-                               ctx: t.Context(),
-                       },
-                       setup: func(t *testing.T, fields *fields) {
+                       setup: func(t *testing.T, fields *fields, args *args) {
                                _options := 
testutils.EnrichOptionsWithOptionsForTesting(t)
 
                                transport := test.NewTransport(_options...)
@@ -180,6 +177,10 @@ func TestConnection_Connect(t *testing.T) {
                                        assert.Error(t, codec.Disconnect())
                                })
                                fields.messageCodec = codec
+
+                               var cancelFunc context.CancelFunc
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               t.Cleanup(cancelFunc)
                        },
                        wantAsserter: func(t *testing.T, results <-chan 
plc4go.PlcConnectionConnectResult) bool {
                                assert.NotNil(t, results)
@@ -194,7 +195,7 @@ func TestConnection_Connect(t *testing.T) {
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
                        if tt.setup != nil {
-                               tt.setup(t, &tt.fields)
+                               tt.setup(t, &tt.fields, &tt.args)
                        }
                        c := &Connection{
                                messageCodec:  tt.fields.messageCodec,
@@ -887,8 +888,6 @@ func TestConnection_sendReset(t *testing.T) {
        type args struct {
                ctx                      context.Context
                ch                       chan plc4go.PlcConnectionConnectResult
-               cbusOptions              *readWriteModel.CBusOptions
-               requestContext           *readWriteModel.RequestContext
                sendOutErrorNotification bool
        }
        tests := []struct {
@@ -901,15 +900,7 @@ func TestConnection_sendReset(t *testing.T) {
                {
                        name: "send reset",
                        args: args{
-                               ch: make(chan 
plc4go.PlcConnectionConnectResult, 1),
-                               cbusOptions: func() *readWriteModel.CBusOptions 
{
-                                       var cBusOptions 
readWriteModel.CBusOptions = readWriteModel.NewCBusOptions(false, false, false, 
false, false, false, false, false, false)
-                                       return &cBusOptions
-                               }(),
-                               requestContext: func() 
*readWriteModel.RequestContext {
-                                       var requestContext 
readWriteModel.RequestContext = readWriteModel.NewRequestContext(false)
-                                       return &requestContext
-                               }(),
+                               ch:                       make(chan 
plc4go.PlcConnectionConnectResult, 1),
                                sendOutErrorNotification: false,
                        },
                        setup: func(t *testing.T, fields *fields, args *args) {
@@ -926,7 +917,7 @@ func TestConnection_sendReset(t *testing.T) {
 
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                        wantOk: false,
@@ -948,7 +939,7 @@ func TestConnection_sendReset(t *testing.T) {
                                log:           
testutils.ProduceTestingLogger(t),
                        }
                        c.DefaultConnection = _default.NewDefaultConnection(c, 
testutils.EnrichOptionsWithOptionsForTesting(t)...)
-                       assert.Equalf(t, tt.wantOk, c.sendReset(tt.args.ctx, 
tt.args.ch, tt.args.cbusOptions, tt.args.requestContext, 
tt.args.sendOutErrorNotification), "sendReset(%v, %v, %v, %v, %v)", 
tt.args.ctx, tt.args.ch, tt.args.cbusOptions, tt.args.requestContext, 
tt.args.sendOutErrorNotification)
+                       assert.Equalf(t, tt.wantOk, c.sendReset(tt.args.ctx, 
tt.args.ch, tt.args.sendOutErrorNotification), "sendReset(%v, %v, %v)", 
tt.args.ctx, tt.args.ch, tt.args.sendOutErrorNotification)
                })
        }
 }
@@ -1076,6 +1067,7 @@ func TestConnection_setInterface1PowerUpSettings(t 
*testing.T) {
                                codec := NewMessageCodec(ti, _options...)
                                require.NoError(t, codec.Connect(t.Context()))
                                t.Cleanup(func() {
+                                       t.Log("disconnecting codec")
                                        assert.Error(t, codec.Disconnect())
                                })
                                fields.messageCodec = codec
@@ -1306,7 +1298,7 @@ func TestConnection_setupConnection(t *testing.T) {
 
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                        validator: func(t *testing.T, result 
plc4go.PlcConnectionConnectResult) {
@@ -1644,7 +1636,7 @@ func TestConnection_setupConnection(t *testing.T) {
 
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                        validator: func(t *testing.T, result 
plc4go.PlcConnectionConnectResult) {
diff --git a/plc4go/internal/cbus/Driver_test.go 
b/plc4go/internal/cbus/Driver_test.go
index ede88034df..0725d531ca 100644
--- a/plc4go/internal/cbus/Driver_test.go
+++ b/plc4go/internal/cbus/Driver_test.go
@@ -73,7 +73,7 @@ func TestDriver_GetConnection(t *testing.T) {
                        setup: func(t *testing.T, fields *fields, args *args) {
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                        wantVerifier: func(t *testing.T, results <-chan 
plc4go.PlcConnectionConnectResult) bool {
@@ -108,7 +108,7 @@ func TestDriver_GetConnection(t *testing.T) {
                                args.transports["test"] = 
test.NewTransport(testutils.EnrichOptionsWithOptionsForTesting(t)...)
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                        wantVerifier: func(t *testing.T, results <-chan 
plc4go.PlcConnectionConnectResult) bool {
@@ -144,7 +144,7 @@ func TestDriver_GetConnection(t *testing.T) {
                                args.transports["test"] = 
test.NewTransport(testutils.EnrichOptionsWithOptionsForTesting(t)...)
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                        wantVerifier: func(t *testing.T, results <-chan 
plc4go.PlcConnectionConnectResult) bool {
@@ -177,7 +177,7 @@ func TestDriver_GetConnection(t *testing.T) {
                                args.transports["test"] = 
test.NewTransport(testutils.EnrichOptionsWithOptionsForTesting(t)...)
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                        wantVerifier: func(t *testing.T, results <-chan 
plc4go.PlcConnectionConnectResult) bool {
diff --git a/plc4go/internal/cbus/MessageCodec.go 
b/plc4go/internal/cbus/MessageCodec.go
index 7f6c18c14b..5325da1c33 100644
--- a/plc4go/internal/cbus/MessageCodec.go
+++ b/plc4go/internal/cbus/MessageCodec.go
@@ -137,16 +137,18 @@ func (m *MessageCodec) Receive(ctx context.Context, 
timeout time.Duration) (spi.
                if err := ti.FillBuffer(
                        ctx,
                        func(pos uint, currentByte byte, reader 
transports.ExtendedReader) bool {
-                               m.log.Trace().Uint8("byte", 
currentByte).Msg("current byte")
+                               m.log.Trace().Uint("pos", pos).Uint8("byte", 
currentByte).Str("rune", string(rune(currentByte))).Msg("current byte")
                                switch currentByte {
                                case
                                        readWriteModel.ResponseTermination_CR,
                                        readWriteModel.ResponseTermination_LF:
+                                       m.log.Trace().Msg("Found termination 
byte")
                                        return false
                                case 
byte(readWriteModel.ConfirmationType_CONFIRMATION_SUCCESSFUL):
                                        confirmation = true
                                        // In case we have directly more data 
in the buffer after a confirmation
                                        _, err := reader.Peek(int(pos + 1))
+                                       m.log.Trace().Err(err).Msg("Peeking one 
more")
                                        return err == nil
                                case
                                        
byte(readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS),
@@ -155,6 +157,7 @@ func (m *MessageCodec) Receive(ctx context.Context, timeout 
time.Duration) (spi.
                                        
byte(readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG),
                                        
byte(readWriteModel.ConfirmationType_CHECKSUM_FAILURE):
                                        confirmation = true
+                                       m.log.Trace().Msg("Found confirmation")
                                        return false
                                default:
                                        return true
diff --git a/plc4go/internal/cbus/MessageCodec_test.go 
b/plc4go/internal/cbus/MessageCodec_test.go
index 012434cc09..09efc0ea3a 100644
--- a/plc4go/internal/cbus/MessageCodec_test.go
+++ b/plc4go/internal/cbus/MessageCodec_test.go
@@ -20,6 +20,7 @@
 package cbus
 
 import (
+       "context"
        "fmt"
        "testing"
        "time"
@@ -117,6 +118,7 @@ func TestMessageCodec_Receive(t *testing.T) {
                monitoredSALs  chan readWriteModel.MonitoredSAL
        }
        type args struct {
+               ctx     context.Context
                timeout time.Duration
        }
        tests := []struct {
@@ -137,6 +139,7 @@ func TestMessageCodec_Receive(t *testing.T) {
                                monitoredSALs:  nil,
                        },
                        args: args{
+                               ctx:     t.Context(),
                                timeout: 10 * time.Second,
                        },
                        setup: func(t *testing.T, fields *fields) {
@@ -162,6 +165,7 @@ func TestMessageCodec_Receive(t *testing.T) {
                                monitoredSALs:  nil,
                        },
                        args: args{
+                               ctx:     t.Context(),
                                timeout: 10 * time.Second,
                        },
                        want: readWriteModel.NewCBusMessageToClient(
@@ -193,6 +197,7 @@ func TestMessageCodec_Receive(t *testing.T) {
                                monitoredSALs:  nil,
                        },
                        args: args{
+                               ctx:     t.Context(),
                                timeout: 10 * time.Second,
                        },
                        setup: func(t *testing.T, fields *fields) {
@@ -219,6 +224,7 @@ func TestMessageCodec_Receive(t *testing.T) {
                                monitoredSALs:  nil,
                        },
                        args: args{
+                               ctx:     t.Context(),
                                timeout: 10 * time.Second,
                        },
                        setup: func(t *testing.T, fields *fields) {
@@ -245,6 +251,7 @@ func TestMessageCodec_Receive(t *testing.T) {
                                monitoredSALs:  nil,
                        },
                        args: args{
+                               ctx:     t.Context(),
                                timeout: 10 * time.Second,
                        },
                        setup: func(t *testing.T, fields *fields) {
@@ -279,6 +286,7 @@ func TestMessageCodec_Receive(t *testing.T) {
                                monitoredSALs:  nil,
                        },
                        args: args{
+                               ctx:     t.Context(),
                                timeout: 10 * time.Second,
                        },
                        manipulator: func(t *testing.T, messageCodec 
*MessageCodec) {
@@ -326,6 +334,7 @@ func TestMessageCodec_Receive(t *testing.T) {
                                monitoredSALs:  nil,
                        },
                        args: args{
+                               ctx:     t.Context(),
                                timeout: 10 * time.Second,
                        },
                        setup: func(t *testing.T, fields *fields) {
@@ -523,6 +532,7 @@ func TestMessageCodec_Receive(t *testing.T) {
                                monitoredSALs:  nil,
                        },
                        args: args{
+                               ctx:     t.Context(),
                                timeout: 10 * time.Second,
                        },
                        manipulator: func(t *testing.T, messageCodec 
*MessageCodec) {
@@ -603,7 +613,7 @@ func TestMessageCodec_Receive(t *testing.T) {
                        if tt.manipulator != nil {
                                tt.manipulator(t, m)
                        }
-                       got, err := m.Receive(t.Context(), tt.args.timeout)
+                       got, err := m.Receive(tt.args.ctx, tt.args.timeout)
                        if !tt.wantErr(t, err, fmt.Sprintf("Receive()")) {
                                return
                        }
@@ -776,10 +786,10 @@ func Test_extractMMIAndSAL(t *testing.T) {
                message spi.Message
        }
        tests := []struct {
-               name  string
-               args  args
-               setup func(t *testing.T, args *args)
-               want  bool
+               name    string
+               args    args
+               setup   func(t *testing.T, args *args)
+               handled bool
        }{
                {
                        name: "extract it",
@@ -818,6 +828,7 @@ func Test_extractMMIAndSAL(t *testing.T) {
                                codec.monitoredSALs = make(chan 
readWriteModel.MonitoredSAL, 1)
                                args.codec = codec
                        },
+                       handled: true,
                },
        }
        for _, tt := range tests {
@@ -825,7 +836,7 @@ func Test_extractMMIAndSAL(t *testing.T) {
                        if tt.setup != nil {
                                tt.setup(t, &tt.args)
                        }
-                       assert.Equalf(t, tt.want, 
extractMMIAndSAL(testutils.ProduceTestingLogger(t))(t.Context(), tt.args.codec, 
tt.args.message), "extractMMIAndSAL(%v, %v)", tt.args.codec, tt.args.message)
+                       assert.Equalf(t, tt.handled, 
extractMMIAndSAL(testutils.ProduceTestingLogger(t))(t.Context(), tt.args.codec, 
tt.args.message), "extractMMIAndSAL(%v, %v) to be handled", tt.args.codec, 
tt.args.message)
                })
        }
 }
diff --git a/plc4go/internal/cbus/Reader_test.go 
b/plc4go/internal/cbus/Reader_test.go
index ac954a88d0..1466de5cec 100644
--- a/plc4go/internal/cbus/Reader_test.go
+++ b/plc4go/internal/cbus/Reader_test.go
@@ -102,7 +102,7 @@ func TestReader_Read(t *testing.T) {
                        setup: func(t *testing.T, fields *fields, args *args) {
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                        wantAsserter: func(t *testing.T, results <-chan 
apiModel.PlcReadRequestResult) bool {
@@ -162,7 +162,7 @@ func TestReader_readSync(t *testing.T) {
                        setup: func(t *testing.T, fields *fields, args *args) {
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                        resultEvaluator: func(t *testing.T, results chan 
apiModel.PlcReadRequestResult) bool {
@@ -214,7 +214,7 @@ func TestReader_readSync(t *testing.T) {
                                })
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                        resultEvaluator: func(t *testing.T, results chan 
apiModel.PlcReadRequestResult) bool {
@@ -242,7 +242,7 @@ func TestReader_readSync(t *testing.T) {
                        setup: func(t *testing.T, fields *fields, args *args) {
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                        resultEvaluator: func(t *testing.T, results chan 
apiModel.PlcReadRequestResult) bool {
@@ -321,7 +321,7 @@ func TestReader_readSync(t *testing.T) {
 
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                        resultEvaluator: func(t *testing.T, results chan 
apiModel.PlcReadRequestResult) bool {
@@ -483,7 +483,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                },
@@ -644,7 +644,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                },
@@ -727,7 +727,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                },
@@ -810,7 +810,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                },
@@ -893,7 +893,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                },
@@ -976,7 +976,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                },
@@ -1059,7 +1059,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                },
@@ -1142,7 +1142,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                },
@@ -1161,12 +1161,11 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
                        }
                        m.sendMessageOverTheWire(tt.args.ctx, 
tt.args.transaction, tt.args.messageToSend, tt.args.addResponseCode(t), 
tt.args.tagName, tt.args.addPlcValue(t))
                        t.Log("Waiting now")
-                       timer := time.NewTimer(10 * time.Second)
                        select {
+                       case <-t.Context().Done():
+                               t.Log("aborted")
                        case <-ch:
                                t.Log("Done waiting")
-                       case <-timer.C:
-                               t.Error("Timeout")
                        }
                })
        }
diff --git a/plc4go/internal/cbus/Writer_test.go 
b/plc4go/internal/cbus/Writer_test.go
index 3bacea5d17..83db24aece 100644
--- a/plc4go/internal/cbus/Writer_test.go
+++ b/plc4go/internal/cbus/Writer_test.go
@@ -86,7 +86,7 @@ func TestWriter_Write(t *testing.T) {
                        setup: func(t *testing.T, fields *fields, args *args) {
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                        wantAsserter: func(t *testing.T, results <-chan 
apiModel.PlcWriteRequestResult) bool {
@@ -112,7 +112,7 @@ func TestWriter_Write(t *testing.T) {
                        setup: func(t *testing.T, fields *fields, args *args) {
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                        wantAsserter: func(t *testing.T, results <-chan 
apiModel.PlcWriteRequestResult) bool {
@@ -162,7 +162,7 @@ func TestWriter_Write(t *testing.T) {
                                setup: func(t *testing.T, fields *fields, args 
*args){
                                        args.ctx = testutils.TestContext(t)
                                        var cancelFunc context.CancelFunc
-                                       args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                                       args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                        t.Cleanup(cancelFunc)
                                },
                                wantAsserter: func(t *testing.T, results <-chan 
apiModel.PlcWriteRequestResult) bool {
diff --git a/plc4go/spi/default/DefaultCodec.go 
b/plc4go/spi/default/DefaultCodec.go
index f14d6efe31..7666f6f2b1 100644
--- a/plc4go/spi/default/DefaultCodec.go
+++ b/plc4go/spi/default/DefaultCodec.go
@@ -358,7 +358,10 @@ mainLoop:
                        continue mainLoop
                }
                nextExpire := m.TimeoutExpectations(now)
-               workerLog.Debug().Dur("nextExpire", nextExpire).Msg("waiting 
for next expire")
+               m.expectationsChangeMutex.RLock()
+               numberOfExpectations = len(m.expectations)
+               m.expectationsChangeMutex.RUnlock()
+               workerLog.Debug().Dur("nextExpire", 
nextExpire).Int("numberOfExpectations", numberOfExpectations).Msg("waiting for 
next expire")
                timer := time.NewTimer(nextExpire)
                select {
                case <-m.notifyExpireWorker:
@@ -471,7 +474,7 @@ mainLoop:
                        workerLog.Trace().Msg("Executing custom handling")
                        start := time.Now()
                        handled := m.customMessageHandling(m.ctx, 
m.DefaultCodecRequirements, message)
-                       workerLog.Trace().TimeDiff("elapsedTime", time.Now(), 
start).Msg("custom handling took elapsedTime")
+                       workerLog.Trace().TimeDiff("elapsedTime", time.Now(), 
start).Bool("handled", handled).Msg("custom handling took elapsedTime")
                        if handled {
                                workerLog.Trace().Msg("Custom handling handled 
the message")
                                continue mainLoop
diff --git a/plc4go/spi/default/DefaultCodec_test.go 
b/plc4go/spi/default/DefaultCodec_test.go
index a4b82fbcab..15ca290865 100644
--- a/plc4go/spi/default/DefaultCodec_test.go
+++ b/plc4go/spi/default/DefaultCodec_test.go
@@ -491,7 +491,7 @@ func Test_defaultCodec_Expect(t *testing.T) {
                        setup: func(t *testing.T, fields *fields, args *args) {
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                },
@@ -883,7 +883,7 @@ func Test_defaultCodec_SendRequest(t *testing.T) {
 
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                        wantErr: assert.NoError,
@@ -908,7 +908,7 @@ func Test_defaultCodec_SendRequest(t *testing.T) {
 
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                        wantErr: assert.Error,
diff --git a/plc4go/spi/interceptors/SingleItemRequestInterceptor_test.go 
b/plc4go/spi/interceptors/SingleItemRequestInterceptor_test.go
index 58c244400a..ed6f7032e0 100644
--- a/plc4go/spi/interceptors/SingleItemRequestInterceptor_test.go
+++ b/plc4go/spi/interceptors/SingleItemRequestInterceptor_test.go
@@ -129,7 +129,7 @@ func 
TestSingleItemRequestInterceptor_InterceptReadRequest(t *testing.T) {
 
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                        wantAssert: func(t *testing.T, args args, got 
[]apiModel.PlcReadRequest) bool {
@@ -285,7 +285,7 @@ func 
TestSingleItemRequestInterceptor_InterceptWriteRequest(t *testing.T) {
 
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                        wantAssert: func(t *testing.T, args args, got 
[]apiModel.PlcWriteRequest) bool {
@@ -445,7 +445,7 @@ func 
TestSingleItemRequestInterceptor_ProcessReadResponses(t *testing.T) {
 
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                        wantAssert: func(t *testing.T, args args, got 
apiModel.PlcReadRequestResult) bool {
@@ -476,7 +476,7 @@ func 
TestSingleItemRequestInterceptor_ProcessReadResponses(t *testing.T) {
 
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                        wantAssert: func(t *testing.T, args args, got 
apiModel.PlcReadRequestResult) bool {
@@ -626,7 +626,7 @@ func 
TestSingleItemRequestInterceptor_ProcessWriteResponses(t *testing.T) {
 
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                        wantAssert: func(t *testing.T, args args, got 
apiModel.PlcWriteRequestResult) bool {
@@ -655,7 +655,7 @@ func 
TestSingleItemRequestInterceptor_ProcessWriteResponses(t *testing.T) {
 
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                        wantAssert: func(t *testing.T, args args, got 
apiModel.PlcWriteRequestResult) bool {
diff --git a/plc4go/spi/transports/test/TransportInstance.go 
b/plc4go/spi/transports/test/TransportInstance.go
index b937e7b608..2244a22529 100644
--- a/plc4go/spi/transports/test/TransportInstance.go
+++ b/plc4go/spi/transports/test/TransportInstance.go
@@ -20,6 +20,7 @@
 package test
 
 import (
+       "bufio"
        "context"
        "encoding/hex"
        "os"
@@ -36,18 +37,19 @@ import (
 )
 
 type TransportInstance struct {
-       readChannel      chan []byte
-       readBuffer       []byte
-       writeBuffer      []byte
-       transport        *Transport
+       readChannel chan []byte
+       readBuffer  []byte
+       writeBuffer []byte
+       dataMutex   sync.RWMutex
+
+       transport *Transport
+
        writeInterceptor func(transportInstance *TransportInstance, data []byte)
+       simulatedLatency time.Duration
 
-       dataMutex        sync.RWMutex
        connected        atomic.Bool
        stateChangeMutex sync.RWMutex
 
-       simulatedLatency time.Duration
-
        log zerolog.Logger
 }
 
@@ -151,11 +153,7 @@ func (m *TransportInstance) GetNumBytesAvailableInBuffer() 
(uint32, error) {
        if !m.IsConnected() {
                panic(errors.New("working on a unconnected connection"))
        }
-       m.dataMutex.RLock()
-       readableBytes := len(m.readBuffer)
-       m.dataMutex.RUnlock()
-       m.log.Trace().Int("readableBytes", readableBytes).Msg("return number of 
readable bytes")
-       return uint32(readableBytes), nil
+       return m.availableBytes(), nil
 }
 
 func (m *TransportInstance) FillBuffer(ctx context.Context, until func(pos 
uint, currentByte byte, reader transports.ExtendedReader) (keepGoing bool), 
timeout time.Duration) error {
@@ -220,19 +218,12 @@ func (m *TransportInstance) PeekReadableBytes(ctx 
context.Context, numBytes uint
        var err error
        if availableBytes < numBytes {
                m.log.Trace().Msg("not enough bytes available")
-               err = errors.New("not enough bytes available")
-       } else {
-               m.log.Trace().Msg("enough bytes available")
-       }
-       if availableBytes == 0 {
-               m.log.Trace().Msg("No bytes available")
-               return nil, err
+               return m.readBuffer[:], bufio.ErrBufferFull
        }
-       m.dataMutex.RLock()
-       peekAble := m.readBuffer[0:availableBytes]
-       m.dataMutex.RUnlock()
+       m.log.Trace().Msg("enough bytes available")
+       peekAble := m.peek()
        m.log.Trace().Int("peekAbleLen", len(peekAble)).Msg("New buffer size 
peekAbleLen")
-       return peekAble, err
+       return peekAble[:numBytes], err
 }
 
 func (m *TransportInstance) Read(ctx context.Context, numBytes uint32, timeout 
time.Duration) ([]byte, error) {
@@ -270,12 +261,7 @@ func (m *TransportInstance) Read(ctx context.Context, 
numBytes uint32, timeout t
        case <-ctx.Done():
        case <-timer.C:
        }
-       m.dataMutex.RLock()
-       data := m.readBuffer[0:int(numBytes)]
-       m.readBuffer = m.readBuffer[int(numBytes):]
-       m.dataMutex.RUnlock()
-       m.log.Trace().Uint32("availableBytes", availableBytes).Msg("New buffer 
size availableBytes")
-       return data, nil
+       return m.read(int(numBytes)), nil
 }
 
 func (m *TransportInstance) SetWriteInterceptor(writeInterceptor 
func(transportInstance *TransportInstance, data []byte)) {
@@ -370,9 +356,30 @@ func (m *TransportInstance) availableBytes() uint32 {
        return uint32(len(m.readBuffer))
 }
 
+func (m *TransportInstance) peek() []byte {
+       m.dataMutex.RLock()
+       defer m.dataMutex.RUnlock()
+       return m.readBuffer[0:len(m.readBuffer)]
+}
+
+func (m *TransportInstance) read(numBytes int) []byte {
+       m.dataMutex.Lock()
+       defer m.dataMutex.Unlock()
+       data := m.readBuffer[0:int(numBytes)]
+       m.readBuffer = m.readBuffer[int(numBytes):]
+       return data
+}
+
+func (m *TransportInstance) appendRead(newBytes ...byte) (totalAvailableBytes 
uint32) {
+       m.dataMutex.Lock()
+       defer m.dataMutex.Unlock()
+       m.readBuffer = append(m.readBuffer, newBytes...)
+       return uint32(len(m.readBuffer))
+}
+
 func (m *TransportInstance) transferFromChannel(ctx context.Context, timeout 
time.Duration) (totalAvailableBytes uint32) {
        m.log.Trace().Dur("timeout", timeout).Msg("Transfer from channel")
-       m.dataMutex.Lock()
+       totalAvailableBytes = m.availableBytes()
        timer := time.NewTimer(timeout)
        start := time.Now()
        select {
@@ -382,9 +389,7 @@ func (m *TransportInstance) transferFromChannel(ctx 
context.Context, timeout tim
                m.log.Trace().Msg("Timeout")
        case newBytes := <-m.readChannel:
                m.log.Trace().Dur("time", time.Since(start)).Msg("Got new 
bytes")
-               m.readBuffer = append(m.readBuffer, newBytes...)
+               totalAvailableBytes = m.appendRead(newBytes...)
        }
-       totalAvailableBytes = uint32(len(m.readBuffer))
-       m.dataMutex.Unlock()
        return totalAvailableBytes
 }
diff --git a/plc4go/spi/transports/test/TransportInstance_test.go 
b/plc4go/spi/transports/test/TransportInstance_test.go
index b7ffe17d64..ed32c26fe3 100644
--- a/plc4go/spi/transports/test/TransportInstance_test.go
+++ b/plc4go/spi/transports/test/TransportInstance_test.go
@@ -270,6 +270,7 @@ func TestTransportInstance_FillReadBuffer(t *testing.T) {
                                writeBuffer:      tt.fields.writeBuffer,
                                transport:        tt.fields.transport,
                                writeInterceptor: tt.fields.writeInterceptor,
+                               readChannel:      make(chan []byte, 1),
                        }
                        if tt.manipulator != nil {
                                tt.manipulator(t, m)
diff --git a/plc4go/spi/transports/test/Transport_test.go 
b/plc4go/spi/transports/test/Transport_test.go
index 45a2a33d19..22e089cbba 100644
--- a/plc4go/spi/transports/test/Transport_test.go
+++ b/plc4go/spi/transports/test/Transport_test.go
@@ -26,6 +26,7 @@ import (
 
        "github.com/rs/zerolog/log"
        "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
 
        "github.com/apache/plc4x/plc4go/spi/transports"
 )
@@ -106,23 +107,24 @@ func TestTransport_CreateTransportInstance(t *testing.T) {
                options      map[string][]string
        }
        tests := []struct {
-               name    string
-               fields  fields
-               args    args
-               want    transports.TransportInstance
-               wantErr bool
+               name       string
+               fields     fields
+               args       args
+               wantAssert func(*testing.T, transports.TransportInstance) bool
+               wantErr    bool
        }{
                {
                        name: "create it",
                        fields: fields{
                                preregisteredInstances: 
map[url.URL]transports.TransportInstance{},
                        },
-                       want: &TransportInstance{
-                               readBuffer:       []byte{},
-                               writeBuffer:      []byte{},
-                               transport:        NewTransport(),
-                               simulatedLatency: 100 * time.Millisecond,
-                               log:              log.Logger,
+                       wantAssert: func(t *testing.T, instance 
transports.TransportInstance) bool {
+                               require.NotNil(t, instance)
+                               assert.IsType(t, &TransportInstance{}, instance)
+                               ti := instance.(*TransportInstance)
+                               assert.NotNil(t, ti.transport)
+                               assert.Equal(t, 10*time.Millisecond, 
ti.simulatedLatency)
+                               return true
                        },
                },
                {
@@ -135,7 +137,9 @@ func TestTransport_CreateTransportInstance(t *testing.T) {
                        args: args{
                                transportUrl: url.URL{Host: "abcdefg"},
                        },
-                       want: nil,
+                       wantAssert: func(t *testing.T, instance 
transports.TransportInstance) bool {
+                               return assert.Nil(t, instance)
+                       },
                },
                {
                        name: "fail it on purpose",
@@ -144,6 +148,9 @@ func TestTransport_CreateTransportInstance(t *testing.T) {
                                        "failTestTransport": {"yes please"},
                                },
                        },
+                       wantAssert: func(t *testing.T, instance 
transports.TransportInstance) bool {
+                               return assert.Nil(t, instance)
+                       },
                        wantErr: true,
                },
        }
@@ -158,8 +165,8 @@ func TestTransport_CreateTransportInstance(t *testing.T) {
                                t.Errorf("CreateTransportInstance() error = %v, 
wantErr %v", err, tt.wantErr)
                                return
                        }
-                       if !assert.Equal(t, tt.want, got) {
-                               t.Errorf("CreateTransportInstance() got = %v, 
want %v", got, tt.want)
+                       if !assert.True(t, tt.wantAssert(t, got)) {
+                               t.Errorf("CreateTransportInstance() got = %v", 
got)
                        }
                })
        }
diff --git 
a/plc4go/spi/transports/test/transportInstanceDrivenExtendedReader.go 
b/plc4go/spi/transports/test/transportInstanceDrivenExtendedReader.go
index 0995fc113b..9fe0864bd1 100644
--- a/plc4go/spi/transports/test/transportInstanceDrivenExtendedReader.go
+++ b/plc4go/spi/transports/test/transportInstanceDrivenExtendedReader.go
@@ -47,9 +47,7 @@ func (t *transportInstanceDrivenExtendedReader) Read(p 
[]byte) (n int, err error
 
 func (t *transportInstanceDrivenExtendedReader) ReadByte() (byte, error) {
        numBytes := uint32(1)
-       t.dataMutex.RLock()
        availableBytes := t.availableBytes()
-       t.dataMutex.RUnlock()
        if availableBytes < numBytes {
                t.log.Trace().Uint32("numBytes", 
numBytes).Uint32("availableBytes", availableBytes).Msg("Trying transfer now")
                availableBytes = t.transferFromChannel(t.ctx, t.timeout)
@@ -61,9 +59,7 @@ func (t *transportInstanceDrivenExtendedReader) ReadByte() 
(byte, error) {
 
 func (t *transportInstanceDrivenExtendedReader) Peek(n int) ([]byte, error) {
        numBytes := uint32(n)
-       t.dataMutex.RLock()
        availableBytes := t.availableBytes()
-       t.dataMutex.RUnlock()
        if availableBytes < numBytes {
                t.log.Trace().Uint32("numBytes", 
numBytes).Uint32("availableBytes", availableBytes).Msg("Trying transfer now")
                availableBytes = t.transferFromChannel(t.ctx, t.timeout)


Reply via email to