This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit d3001a0c4edbfc6c62df8b4fa6ce0ae27d119b55
Author: Sebastian Rühl <[email protected]>
AuthorDate: Fri Nov 14 11:05:41 2025 +0100

    fix(plc4go/cbus): ensure we don't block too long on bufferfill
---
 plc4go/internal/cbus/Connection_test.go         | 86 ++++++++++++-------------
 plc4go/internal/cbus/Driver_test.go             |  2 +-
 plc4go/internal/cbus/MessageCodec.go            |  4 +-
 plc4go/internal/cbus/MessageCodec_test.go       | 12 ++--
 plc4go/spi/transports/test/TransportInstance.go |  4 +-
 5 files changed, 56 insertions(+), 52 deletions(-)

diff --git a/plc4go/internal/cbus/Connection_test.go 
b/plc4go/internal/cbus/Connection_test.go
index efbcadc373..fac6c4db37 100644
--- a/plc4go/internal/cbus/Connection_test.go
+++ b/plc4go/internal/cbus/Connection_test.go
@@ -174,7 +174,7 @@ func TestConnection_Connect(t *testing.T) {
                                require.NoError(t, err)
                                codec := NewMessageCodec(ti, _options...)
                                t.Cleanup(func() {
-                                       assert.Error(t, codec.Disconnect())
+                                       assert.NoError(t, codec.Disconnect())
                                })
                                fields.messageCodec = codec
 
@@ -669,11 +669,11 @@ func TestConnection_sendCalDataWrite(t *testing.T) {
                requestContext *readWriteModel.RequestContext
        }
        tests := []struct {
-               name   string
-               fields fields
-               args   args
-               setup  func(t *testing.T, fields *fields, args *args)
-               want   bool
+               name    string
+               fields  fields
+               args    args
+               setup   func(t *testing.T, fields *fields, args *args)
+               wantErr assert.ErrorAssertionFunc
        }{
                {
                        name: "send something",
@@ -693,7 +693,7 @@ func TestConnection_sendCalDataWrite(t *testing.T) {
                                codec := NewMessageCodec(ti, _options...)
                                require.NoError(t, codec.Connect(t.Context()))
                                t.Cleanup(func() {
-                                       assert.Error(t, codec.Disconnect())
+                                       assert.NoError(t, codec.Disconnect())
                                })
                                fields.messageCodec = codec
 
@@ -702,7 +702,7 @@ func TestConnection_sendCalDataWrite(t *testing.T) {
                                args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
                                t.Cleanup(cancelFunc)
                        },
-                       want: false,
+                       wantErr: assert.Error,
                },
        }
        for _, tt := range tests {
@@ -721,7 +721,7 @@ func TestConnection_sendCalDataWrite(t *testing.T) {
                                log:           
testutils.ProduceTestingLogger(t),
                        }
                        c.DefaultConnection = _default.NewDefaultConnection(c, 
testutils.EnrichOptionsWithOptionsForTesting(t)...)
-                       assert.Equalf(t, tt.want, 
c.sendCalDataWrite(tt.args.ctx, tt.args.paramNo, tt.args.parameterValue, 
tt.args.requestContext), "sendCalDataWrite(%v, %v, %v, %v, %v, %v)", 
tt.args.ctx, tt.args.paramNo, tt.args.parameterValue, tt.args.requestContext)
+                       assert.Truef(t, tt.wantErr(t, 
c.sendCalDataWrite(tt.args.ctx, tt.args.paramNo, tt.args.parameterValue, 
tt.args.requestContext)), "sendCalDataWrite(%v, %v, %v, %v, %v, %v)", 
tt.args.ctx, tt.args.paramNo, tt.args.parameterValue, tt.args.requestContext)
                })
        }
 }
@@ -805,11 +805,11 @@ func TestConnection_setApplicationFilter(t *testing.T) {
                requestContext *readWriteModel.RequestContext
        }
        tests := []struct {
-               name   string
-               fields fields
-               args   args
-               setup  func(t *testing.T, fields *fields, args *args)
-               wantOk bool
+               name    string
+               fields  fields
+               args    args
+               setup   func(t *testing.T, fields *fields, args *args)
+               wantErr assert.ErrorAssertionFunc
        }{
                {
                        name: "set application filter (failing)",
@@ -830,7 +830,7 @@ func TestConnection_setApplicationFilter(t *testing.T) {
                                codec := NewMessageCodec(ti, _options...)
                                require.NoError(t, codec.Connect(t.Context()))
                                t.Cleanup(func() {
-                                       assert.Error(t, codec.Disconnect())
+                                       assert.NoError(t, codec.Disconnect())
                                })
                                fields.messageCodec = codec
 
@@ -839,7 +839,7 @@ func TestConnection_setApplicationFilter(t *testing.T) {
                                args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
                                t.Cleanup(cancelFunc)
                        },
-                       wantOk: false,
+                       wantErr: assert.Error,
                },
        }
        for _, tt := range tests {
@@ -858,7 +858,7 @@ func TestConnection_setApplicationFilter(t *testing.T) {
                                log:           
testutils.ProduceTestingLogger(t),
                        }
                        c.DefaultConnection = _default.NewDefaultConnection(c, 
testutils.EnrichOptionsWithOptionsForTesting(t)...)
-                       assert.Equalf(t, tt.wantOk, 
c.setApplicationFilter(tt.args.ctx, tt.args.requestContext), 
"setApplicationFilter(%v, %v)", tt.args.ctx, tt.args.requestContext)
+                       assert.Truef(t, tt.wantErr(t, 
c.setApplicationFilter(tt.args.ctx, tt.args.requestContext)), 
"setApplicationFilter(%v, %v)", tt.args.ctx, tt.args.requestContext)
                })
        }
 }
@@ -878,11 +878,11 @@ func TestConnection_setInterface1PowerUpSettings(t 
*testing.T) {
                cbusOptions    *readWriteModel.CBusOptions
        }
        tests := []struct {
-               name   string
-               fields fields
-               args   args
-               setup  func(t *testing.T, fields *fields, args *args)
-               wantOk bool
+               name    string
+               fields  fields
+               args    args
+               setup   func(t *testing.T, fields *fields, args *args)
+               wantErr assert.ErrorAssertionFunc
        }{
                {
                        name: "set interface 1 PUN options (failing)",
@@ -908,7 +908,7 @@ func TestConnection_setInterface1PowerUpSettings(t 
*testing.T) {
                                require.NoError(t, codec.Connect(t.Context()))
                                t.Cleanup(func() {
                                        t.Log("disconnecting codec")
-                                       assert.Error(t, codec.Disconnect())
+                                       assert.NoError(t, codec.Disconnect())
                                })
                                fields.messageCodec = codec
 
@@ -917,7 +917,7 @@ func TestConnection_setInterface1PowerUpSettings(t 
*testing.T) {
                                args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
                                t.Cleanup(cancelFunc)
                        },
-                       wantOk: false,
+                       wantErr: assert.Error,
                },
        }
        for _, tt := range tests {
@@ -936,7 +936,7 @@ func TestConnection_setInterface1PowerUpSettings(t 
*testing.T) {
                                log:           
testutils.ProduceTestingLogger(t),
                        }
                        c.DefaultConnection = _default.NewDefaultConnection(c, 
testutils.EnrichOptionsWithOptionsForTesting(t)...)
-                       assert.Equalf(t, tt.wantOk, 
c.setInterface1PowerUpSettings(tt.args.ctx, tt.args.requestContext, 
tt.args.cbusOptions), "setInterface1PowerUpSettings(%v, %v, %v)", tt.args.ctx, 
tt.args.requestContext, tt.args.cbusOptions)
+                       assert.Truef(t, tt.wantErr(t, 
c.setInterface1PowerUpSettings(tt.args.ctx, tt.args.requestContext, 
tt.args.cbusOptions)), "setInterface1PowerUpSettings(%v, %v, %v)", tt.args.ctx, 
tt.args.requestContext, tt.args.cbusOptions)
                })
        }
 }
@@ -956,11 +956,11 @@ func TestConnection_setInterfaceOptions1(t *testing.T) {
                cbusOptions    *readWriteModel.CBusOptions
        }
        tests := []struct {
-               name   string
-               fields fields
-               args   args
-               setup  func(t *testing.T, fields *fields, args *args)
-               want   bool
+               name    string
+               fields  fields
+               args    args
+               setup   func(t *testing.T, fields *fields, args *args)
+               wantErr assert.ErrorAssertionFunc
        }{
                {
                        name: "set interface 1 options (failing)",
@@ -984,7 +984,7 @@ func TestConnection_setInterfaceOptions1(t *testing.T) {
                                codec := NewMessageCodec(ti, _options...)
                                require.NoError(t, codec.Connect(t.Context()))
                                t.Cleanup(func() {
-                                       assert.Error(t, codec.Disconnect())
+                                       assert.NoError(t, codec.Disconnect())
                                })
                                fields.messageCodec = codec
 
@@ -993,7 +993,7 @@ func TestConnection_setInterfaceOptions1(t *testing.T) {
                                args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
                                t.Cleanup(cancelFunc)
                        },
-                       want: false,
+                       wantErr: assert.Error,
                },
        }
        for _, tt := range tests {
@@ -1012,7 +1012,7 @@ func TestConnection_setInterfaceOptions1(t *testing.T) {
                                log:           
testutils.ProduceTestingLogger(t),
                        }
                        c.DefaultConnection = _default.NewDefaultConnection(c, 
testutils.EnrichOptionsWithOptionsForTesting(t)...)
-                       assert.Equalf(t, tt.want, 
c.setInterfaceOptions1(tt.args.ctx, tt.args.requestContext, 
tt.args.cbusOptions), "setInterfaceOptions1(%v, %v, %v)", tt.args.ctx, 
tt.args.requestContext, tt.args.cbusOptions)
+                       assert.Truef(t, tt.wantErr(t, 
c.setInterfaceOptions1(tt.args.ctx, tt.args.requestContext, 
tt.args.cbusOptions)), "setInterfaceOptions1(%v, %v, %v)", tt.args.ctx, 
tt.args.requestContext, tt.args.cbusOptions)
                })
        }
 }
@@ -1032,11 +1032,11 @@ func TestConnection_setInterfaceOptions3(t *testing.T) {
                cbusOptions    *readWriteModel.CBusOptions
        }
        tests := []struct {
-               name   string
-               fields fields
-               args   args
-               setup  func(t *testing.T, fields *fields, args *args)
-               wantOk bool
+               name    string
+               fields  fields
+               args    args
+               setup   func(t *testing.T, fields *fields, args *args)
+               wantErr assert.ErrorAssertionFunc
        }{
                {
                        name: "set interface 3 options (failing)",
@@ -1060,7 +1060,7 @@ func TestConnection_setInterfaceOptions3(t *testing.T) {
                                codec := NewMessageCodec(ti, _options...)
                                require.NoError(t, codec.Connect(t.Context()))
                                t.Cleanup(func() {
-                                       assert.Error(t, codec.Disconnect())
+                                       assert.NoError(t, codec.Disconnect())
                                })
                                fields.messageCodec = codec
 
@@ -1069,7 +1069,7 @@ func TestConnection_setInterfaceOptions3(t *testing.T) {
                                args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
                                t.Cleanup(cancelFunc)
                        },
-                       wantOk: false,
+                       wantErr: assert.Error,
                },
        }
        for _, tt := range tests {
@@ -1088,7 +1088,7 @@ func TestConnection_setInterfaceOptions3(t *testing.T) {
                                log:           
testutils.ProduceTestingLogger(t),
                        }
                        c.DefaultConnection = _default.NewDefaultConnection(c, 
testutils.EnrichOptionsWithOptionsForTesting(t)...)
-                       assert.Equalf(t, tt.wantOk, 
c.setInterfaceOptions3(tt.args.ctx, tt.args.requestContext, 
tt.args.cbusOptions), "setInterfaceOptions3(%v, %v, %v)", tt.args.ctx, 
tt.args.requestContext, tt.args.cbusOptions)
+                       assert.Truef(t, tt.wantErr(t, 
c.setInterfaceOptions3(tt.args.ctx, tt.args.requestContext, 
tt.args.cbusOptions)), "setInterfaceOptions3(%v, %v, %v)", tt.args.ctx, 
tt.args.requestContext, tt.args.cbusOptions)
                })
        }
 }
@@ -1480,9 +1480,7 @@ func TestConnection_setupConnection(t *testing.T) {
                        }
                        c.DefaultConnection = _default.NewDefaultConnection(c, 
testutils.EnrichOptionsWithOptionsForTesting(t)...)
                        err := c.setupConnection(tt.args.ctx)
-                       if tt.wantErr(t, err) {
-                               t.FailNow()
-                       }
+                       tt.wantErr(t, err)
                })
        }
 }
diff --git a/plc4go/internal/cbus/Driver_test.go 
b/plc4go/internal/cbus/Driver_test.go
index d49318b222..d3509cd178 100644
--- a/plc4go/internal/cbus/Driver_test.go
+++ b/plc4go/internal/cbus/Driver_test.go
@@ -105,7 +105,7 @@ func TestDriver_GetConnection(t *testing.T) {
                        },
                        wantVerifier: func(t *testing.T, conn 
plc4go.PlcConnection, err error) bool {
                                assert.Error(t, err)
-                               assert.Equal(t, "couldn't initialize transport 
configuration for given transport url test:: test transport failed on purpose", 
err)
+                               assert.ErrorContains(t, err, "couldn't 
initialize transport configuration for given transport url test:: test 
transport failed on purpose")
                                return true
                        },
                },
diff --git a/plc4go/internal/cbus/MessageCodec.go 
b/plc4go/internal/cbus/MessageCodec.go
index f1e266674f..7de6aa4d6c 100644
--- a/plc4go/internal/cbus/MessageCodec.go
+++ b/plc4go/internal/cbus/MessageCodec.go
@@ -133,7 +133,8 @@ func (m *MessageCodec) Receive(ctx context.Context) 
(spi.Message, error) {
        confirmation := false
        // Fill the buffer
        {
-               if err := ti.FillBuffer(ctx, func(pos uint, currentByte byte, 
reader transports.ExtendedReader) bool {
+               fillCtx, fillCtxCancel := context.WithTimeout(ctx, 
100*time.Millisecond)
+               if err := ti.FillBuffer(fillCtx, func(pos uint, currentByte 
byte, reader transports.ExtendedReader) bool {
                        switch currentByte {
                        case
                                readWriteModel.ResponseTermination_CR,
@@ -160,6 +161,7 @@ func (m *MessageCodec) Receive(ctx context.Context) 
(spi.Message, error) {
                } else {
                        m.log.Trace().Msg("Buffer filled")
                }
+               fillCtxCancel()
        }
 
        // Check how many readable bytes we have
diff --git a/plc4go/internal/cbus/MessageCodec_test.go 
b/plc4go/internal/cbus/MessageCodec_test.go
index cf6d7b66cd..bbfcd40e6c 100644
--- a/plc4go/internal/cbus/MessageCodec_test.go
+++ b/plc4go/internal/cbus/MessageCodec_test.go
@@ -695,8 +695,13 @@ func TestMessageCodec_Receive_Delayed_Response(t 
*testing.T) {
                })
                codec.requestContext = readWriteModel.NewRequestContext(true)
 
+               canceledCtx := func() context.Context {
+                       ctx, cancelFunc := context.WithCancel(t.Context())
+                       cancelFunc()
+                       return ctx
+               }
                var msg spi.Message
-               msg, err = codec.Receive(t.Context())
+               msg, err = codec.Receive(canceledCtx())
                // No data yet so this should return no error and no data
                assert.NoError(t, err)
                assert.Nil(t, msg)
@@ -706,7 +711,7 @@ func TestMessageCodec_Receive_Delayed_Response(t 
*testing.T) {
                for i := 0; i < 8; i++ {
                        t.Logf("%d try", i+1)
                        // We should wait for more data, so no error, no message
-                       msg, err = codec.Receive(t.Context())
+                       msg, err = codec.Receive(canceledCtx())
                        assert.NoError(t, err)
                        assert.Nil(t, msg)
                }
@@ -730,7 +735,6 @@ func TestMessageCodec_Receive_Delayed_Response(t 
*testing.T) {
                require.NoError(t, err)
 
                require.NoError(t, ti.Connect(t.Context()))
-               
ti.(*test.TransportInstance).FillReadBuffer([]byte("@A62120\r@A62120\r"))
                codec := NewMessageCodec(ti, _options...)
                t.Cleanup(func() {
                        assert.Error(t, codec.Disconnect())
@@ -750,7 +754,7 @@ func TestMessageCodec_Receive_Delayed_Response(t 
*testing.T) {
                        // We should wait for more data, so no error, no message
                        msg, err = codec.Receive(t.Context())
                        if i == 15 {
-                               assert.NoError(t, err)
+                               require.NoError(t, err)
                                require.NotNil(t, msg)
                                // This should be the confirmation only ...
                                reply := 
msg.(readWriteModel.CBusMessageToClient).GetReply()
diff --git a/plc4go/spi/transports/test/TransportInstance.go 
b/plc4go/spi/transports/test/TransportInstance.go
index 02f1320eb2..2ed45e9af9 100644
--- a/plc4go/spi/transports/test/TransportInstance.go
+++ b/plc4go/spi/transports/test/TransportInstance.go
@@ -335,8 +335,8 @@ func (m *TransportInstance) peek() []byte {
 func (m *TransportInstance) read(numBytes int) []byte {
        m.dataMutex.Lock()
        defer m.dataMutex.Unlock()
-       data := m.readBuffer[0:int(numBytes)]
-       m.readBuffer = m.readBuffer[int(numBytes):]
+       data := m.readBuffer[0:numBytes]
+       m.readBuffer = m.readBuffer[numBytes:]
        return data
 }
 

Reply via email to