This is an automated email from the ASF dual-hosted git repository. sruehl pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit d3001a0c4edbfc6c62df8b4fa6ce0ae27d119b55 Author: Sebastian Rühl <[email protected]> AuthorDate: Fri Nov 14 11:05:41 2025 +0100 fix(plc4go/cbus): ensure we don't block too long on bufferfill --- plc4go/internal/cbus/Connection_test.go | 86 ++++++++++++------------- plc4go/internal/cbus/Driver_test.go | 2 +- plc4go/internal/cbus/MessageCodec.go | 4 +- plc4go/internal/cbus/MessageCodec_test.go | 12 ++-- plc4go/spi/transports/test/TransportInstance.go | 4 +- 5 files changed, 56 insertions(+), 52 deletions(-) diff --git a/plc4go/internal/cbus/Connection_test.go b/plc4go/internal/cbus/Connection_test.go index efbcadc373..fac6c4db37 100644 --- a/plc4go/internal/cbus/Connection_test.go +++ b/plc4go/internal/cbus/Connection_test.go @@ -174,7 +174,7 @@ func TestConnection_Connect(t *testing.T) { require.NoError(t, err) codec := NewMessageCodec(ti, _options...) t.Cleanup(func() { - assert.Error(t, codec.Disconnect()) + assert.NoError(t, codec.Disconnect()) }) fields.messageCodec = codec @@ -669,11 +669,11 @@ func TestConnection_sendCalDataWrite(t *testing.T) { requestContext *readWriteModel.RequestContext } tests := []struct { - name string - fields fields - args args - setup func(t *testing.T, fields *fields, args *args) - want bool + name string + fields fields + args args + setup func(t *testing.T, fields *fields, args *args) + wantErr assert.ErrorAssertionFunc }{ { name: "send something", @@ -693,7 +693,7 @@ func TestConnection_sendCalDataWrite(t *testing.T) { codec := NewMessageCodec(ti, _options...) require.NoError(t, codec.Connect(t.Context())) t.Cleanup(func() { - assert.Error(t, codec.Disconnect()) + assert.NoError(t, codec.Disconnect()) }) fields.messageCodec = codec @@ -702,7 +702,7 @@ func TestConnection_sendCalDataWrite(t *testing.T) { args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) t.Cleanup(cancelFunc) }, - want: false, + wantErr: assert.Error, }, } for _, tt := range tests { @@ -721,7 +721,7 @@ func TestConnection_sendCalDataWrite(t *testing.T) { log: testutils.ProduceTestingLogger(t), } c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...) - assert.Equalf(t, tt.want, c.sendCalDataWrite(tt.args.ctx, tt.args.paramNo, tt.args.parameterValue, tt.args.requestContext), "sendCalDataWrite(%v, %v, %v, %v, %v, %v)", tt.args.ctx, tt.args.paramNo, tt.args.parameterValue, tt.args.requestContext) + assert.Truef(t, tt.wantErr(t, c.sendCalDataWrite(tt.args.ctx, tt.args.paramNo, tt.args.parameterValue, tt.args.requestContext)), "sendCalDataWrite(%v, %v, %v, %v, %v, %v)", tt.args.ctx, tt.args.paramNo, tt.args.parameterValue, tt.args.requestContext) }) } } @@ -805,11 +805,11 @@ func TestConnection_setApplicationFilter(t *testing.T) { requestContext *readWriteModel.RequestContext } tests := []struct { - name string - fields fields - args args - setup func(t *testing.T, fields *fields, args *args) - wantOk bool + name string + fields fields + args args + setup func(t *testing.T, fields *fields, args *args) + wantErr assert.ErrorAssertionFunc }{ { name: "set application filter (failing)", @@ -830,7 +830,7 @@ func TestConnection_setApplicationFilter(t *testing.T) { codec := NewMessageCodec(ti, _options...) require.NoError(t, codec.Connect(t.Context())) t.Cleanup(func() { - assert.Error(t, codec.Disconnect()) + assert.NoError(t, codec.Disconnect()) }) fields.messageCodec = codec @@ -839,7 +839,7 @@ func TestConnection_setApplicationFilter(t *testing.T) { args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) t.Cleanup(cancelFunc) }, - wantOk: false, + wantErr: assert.Error, }, } for _, tt := range tests { @@ -858,7 +858,7 @@ func TestConnection_setApplicationFilter(t *testing.T) { log: testutils.ProduceTestingLogger(t), } c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...) - assert.Equalf(t, tt.wantOk, c.setApplicationFilter(tt.args.ctx, tt.args.requestContext), "setApplicationFilter(%v, %v)", tt.args.ctx, tt.args.requestContext) + assert.Truef(t, tt.wantErr(t, c.setApplicationFilter(tt.args.ctx, tt.args.requestContext)), "setApplicationFilter(%v, %v)", tt.args.ctx, tt.args.requestContext) }) } } @@ -878,11 +878,11 @@ func TestConnection_setInterface1PowerUpSettings(t *testing.T) { cbusOptions *readWriteModel.CBusOptions } tests := []struct { - name string - fields fields - args args - setup func(t *testing.T, fields *fields, args *args) - wantOk bool + name string + fields fields + args args + setup func(t *testing.T, fields *fields, args *args) + wantErr assert.ErrorAssertionFunc }{ { name: "set interface 1 PUN options (failing)", @@ -908,7 +908,7 @@ func TestConnection_setInterface1PowerUpSettings(t *testing.T) { require.NoError(t, codec.Connect(t.Context())) t.Cleanup(func() { t.Log("disconnecting codec") - assert.Error(t, codec.Disconnect()) + assert.NoError(t, codec.Disconnect()) }) fields.messageCodec = codec @@ -917,7 +917,7 @@ func TestConnection_setInterface1PowerUpSettings(t *testing.T) { args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) t.Cleanup(cancelFunc) }, - wantOk: false, + wantErr: assert.Error, }, } for _, tt := range tests { @@ -936,7 +936,7 @@ func TestConnection_setInterface1PowerUpSettings(t *testing.T) { log: testutils.ProduceTestingLogger(t), } c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...) - assert.Equalf(t, tt.wantOk, c.setInterface1PowerUpSettings(tt.args.ctx, tt.args.requestContext, tt.args.cbusOptions), "setInterface1PowerUpSettings(%v, %v, %v)", tt.args.ctx, tt.args.requestContext, tt.args.cbusOptions) + assert.Truef(t, tt.wantErr(t, c.setInterface1PowerUpSettings(tt.args.ctx, tt.args.requestContext, tt.args.cbusOptions)), "setInterface1PowerUpSettings(%v, %v, %v)", tt.args.ctx, tt.args.requestContext, tt.args.cbusOptions) }) } } @@ -956,11 +956,11 @@ func TestConnection_setInterfaceOptions1(t *testing.T) { cbusOptions *readWriteModel.CBusOptions } tests := []struct { - name string - fields fields - args args - setup func(t *testing.T, fields *fields, args *args) - want bool + name string + fields fields + args args + setup func(t *testing.T, fields *fields, args *args) + wantErr assert.ErrorAssertionFunc }{ { name: "set interface 1 options (failing)", @@ -984,7 +984,7 @@ func TestConnection_setInterfaceOptions1(t *testing.T) { codec := NewMessageCodec(ti, _options...) require.NoError(t, codec.Connect(t.Context())) t.Cleanup(func() { - assert.Error(t, codec.Disconnect()) + assert.NoError(t, codec.Disconnect()) }) fields.messageCodec = codec @@ -993,7 +993,7 @@ func TestConnection_setInterfaceOptions1(t *testing.T) { args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) t.Cleanup(cancelFunc) }, - want: false, + wantErr: assert.Error, }, } for _, tt := range tests { @@ -1012,7 +1012,7 @@ func TestConnection_setInterfaceOptions1(t *testing.T) { log: testutils.ProduceTestingLogger(t), } c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...) - assert.Equalf(t, tt.want, c.setInterfaceOptions1(tt.args.ctx, tt.args.requestContext, tt.args.cbusOptions), "setInterfaceOptions1(%v, %v, %v)", tt.args.ctx, tt.args.requestContext, tt.args.cbusOptions) + assert.Truef(t, tt.wantErr(t, c.setInterfaceOptions1(tt.args.ctx, tt.args.requestContext, tt.args.cbusOptions)), "setInterfaceOptions1(%v, %v, %v)", tt.args.ctx, tt.args.requestContext, tt.args.cbusOptions) }) } } @@ -1032,11 +1032,11 @@ func TestConnection_setInterfaceOptions3(t *testing.T) { cbusOptions *readWriteModel.CBusOptions } tests := []struct { - name string - fields fields - args args - setup func(t *testing.T, fields *fields, args *args) - wantOk bool + name string + fields fields + args args + setup func(t *testing.T, fields *fields, args *args) + wantErr assert.ErrorAssertionFunc }{ { name: "set interface 3 options (failing)", @@ -1060,7 +1060,7 @@ func TestConnection_setInterfaceOptions3(t *testing.T) { codec := NewMessageCodec(ti, _options...) require.NoError(t, codec.Connect(t.Context())) t.Cleanup(func() { - assert.Error(t, codec.Disconnect()) + assert.NoError(t, codec.Disconnect()) }) fields.messageCodec = codec @@ -1069,7 +1069,7 @@ func TestConnection_setInterfaceOptions3(t *testing.T) { args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) t.Cleanup(cancelFunc) }, - wantOk: false, + wantErr: assert.Error, }, } for _, tt := range tests { @@ -1088,7 +1088,7 @@ func TestConnection_setInterfaceOptions3(t *testing.T) { log: testutils.ProduceTestingLogger(t), } c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...) - assert.Equalf(t, tt.wantOk, c.setInterfaceOptions3(tt.args.ctx, tt.args.requestContext, tt.args.cbusOptions), "setInterfaceOptions3(%v, %v, %v)", tt.args.ctx, tt.args.requestContext, tt.args.cbusOptions) + assert.Truef(t, tt.wantErr(t, c.setInterfaceOptions3(tt.args.ctx, tt.args.requestContext, tt.args.cbusOptions)), "setInterfaceOptions3(%v, %v, %v)", tt.args.ctx, tt.args.requestContext, tt.args.cbusOptions) }) } } @@ -1480,9 +1480,7 @@ func TestConnection_setupConnection(t *testing.T) { } c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...) err := c.setupConnection(tt.args.ctx) - if tt.wantErr(t, err) { - t.FailNow() - } + tt.wantErr(t, err) }) } } diff --git a/plc4go/internal/cbus/Driver_test.go b/plc4go/internal/cbus/Driver_test.go index d49318b222..d3509cd178 100644 --- a/plc4go/internal/cbus/Driver_test.go +++ b/plc4go/internal/cbus/Driver_test.go @@ -105,7 +105,7 @@ func TestDriver_GetConnection(t *testing.T) { }, wantVerifier: func(t *testing.T, conn plc4go.PlcConnection, err error) bool { assert.Error(t, err) - assert.Equal(t, "couldn't initialize transport configuration for given transport url test:: test transport failed on purpose", err) + assert.ErrorContains(t, err, "couldn't initialize transport configuration for given transport url test:: test transport failed on purpose") return true }, }, diff --git a/plc4go/internal/cbus/MessageCodec.go b/plc4go/internal/cbus/MessageCodec.go index f1e266674f..7de6aa4d6c 100644 --- a/plc4go/internal/cbus/MessageCodec.go +++ b/plc4go/internal/cbus/MessageCodec.go @@ -133,7 +133,8 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { confirmation := false // Fill the buffer { - if err := ti.FillBuffer(ctx, func(pos uint, currentByte byte, reader transports.ExtendedReader) bool { + fillCtx, fillCtxCancel := context.WithTimeout(ctx, 100*time.Millisecond) + if err := ti.FillBuffer(fillCtx, func(pos uint, currentByte byte, reader transports.ExtendedReader) bool { switch currentByte { case readWriteModel.ResponseTermination_CR, @@ -160,6 +161,7 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { } else { m.log.Trace().Msg("Buffer filled") } + fillCtxCancel() } // Check how many readable bytes we have diff --git a/plc4go/internal/cbus/MessageCodec_test.go b/plc4go/internal/cbus/MessageCodec_test.go index cf6d7b66cd..bbfcd40e6c 100644 --- a/plc4go/internal/cbus/MessageCodec_test.go +++ b/plc4go/internal/cbus/MessageCodec_test.go @@ -695,8 +695,13 @@ func TestMessageCodec_Receive_Delayed_Response(t *testing.T) { }) codec.requestContext = readWriteModel.NewRequestContext(true) + canceledCtx := func() context.Context { + ctx, cancelFunc := context.WithCancel(t.Context()) + cancelFunc() + return ctx + } var msg spi.Message - msg, err = codec.Receive(t.Context()) + msg, err = codec.Receive(canceledCtx()) // No data yet so this should return no error and no data assert.NoError(t, err) assert.Nil(t, msg) @@ -706,7 +711,7 @@ func TestMessageCodec_Receive_Delayed_Response(t *testing.T) { for i := 0; i < 8; i++ { t.Logf("%d try", i+1) // We should wait for more data, so no error, no message - msg, err = codec.Receive(t.Context()) + msg, err = codec.Receive(canceledCtx()) assert.NoError(t, err) assert.Nil(t, msg) } @@ -730,7 +735,6 @@ func TestMessageCodec_Receive_Delayed_Response(t *testing.T) { require.NoError(t, err) require.NoError(t, ti.Connect(t.Context())) - ti.(*test.TransportInstance).FillReadBuffer([]byte("@A62120\r@A62120\r")) codec := NewMessageCodec(ti, _options...) t.Cleanup(func() { assert.Error(t, codec.Disconnect()) @@ -750,7 +754,7 @@ func TestMessageCodec_Receive_Delayed_Response(t *testing.T) { // We should wait for more data, so no error, no message msg, err = codec.Receive(t.Context()) if i == 15 { - assert.NoError(t, err) + require.NoError(t, err) require.NotNil(t, msg) // This should be the confirmation only ... reply := msg.(readWriteModel.CBusMessageToClient).GetReply() diff --git a/plc4go/spi/transports/test/TransportInstance.go b/plc4go/spi/transports/test/TransportInstance.go index 02f1320eb2..2ed45e9af9 100644 --- a/plc4go/spi/transports/test/TransportInstance.go +++ b/plc4go/spi/transports/test/TransportInstance.go @@ -335,8 +335,8 @@ func (m *TransportInstance) peek() []byte { func (m *TransportInstance) read(numBytes int) []byte { m.dataMutex.Lock() defer m.dataMutex.Unlock() - data := m.readBuffer[0:int(numBytes)] - m.readBuffer = m.readBuffer[int(numBytes):] + data := m.readBuffer[0:numBytes] + m.readBuffer = m.readBuffer[numBytes:] return data }
