This is an automated email from the ASF dual-hosted git repository. sruehl pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 68e9b54b49cbff845a555f7c49d8b8fcc8ccf643 Author: Sebastian Rühl <[email protected]> AuthorDate: Thu Nov 13 16:09:18 2025 +0100 test(plc4go): fix unstable tests --- plc4go/internal/cbus/Browser_test.go | 1 + plc4go/internal/cbus/Connection.go | 6 ++ plc4go/internal/cbus/Connection_test.go | 10 +- plc4go/internal/cbus/MessageCodec_test.go | 168 ++++++++++++++++++------------ 4 files changed, 116 insertions(+), 69 deletions(-) diff --git a/plc4go/internal/cbus/Browser_test.go b/plc4go/internal/cbus/Browser_test.go index fe6b731dd0..aeda64d9d6 100644 --- a/plc4go/internal/cbus/Browser_test.go +++ b/plc4go/internal/cbus/Browser_test.go @@ -589,6 +589,7 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) { require.NoError(t, connectionConnectResult.GetErr()) fields.connection = connectionConnectResult.GetConnection() t.Cleanup(func() { + t.Log("shutting down connection") t.Log(fields.connection.BlockingClose(t.Context())) }) diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go index d384d1ced2..d7d0a76b0f 100644 --- a/plc4go/internal/cbus/Connection.go +++ b/plc4go/internal/cbus/Connection.go @@ -234,6 +234,8 @@ func (c *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn cbusOptions := &c.messageCodec.cbusOptions requestContext := &c.messageCodec.requestContext + c.log.Trace().Msg("Starting connection setup") + c.log.Trace().Msg("Sending reset") if !c.sendReset(ctx, ch, false) { c.log.Warn().Msg("First reset failed") // We try a second reset in case we get a power up @@ -242,18 +244,22 @@ func (c *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn return } } + c.log.Trace().Msg("setting application filter") if !c.setApplicationFilter(ctx, ch, requestContext, cbusOptions) { c.log.Trace().Msg("Set application filter failed") return } + c.log.Trace().Msg("Setting interface options 3") if !c.setInterfaceOptions3(ctx, ch, requestContext, cbusOptions) { c.log.Trace().Msg("Set interface options 3 failed") return } + c.log.Trace().Msg("Setting interface options 1 power up settings") if !c.setInterface1PowerUpSettings(ctx, ch, requestContext, cbusOptions) { c.log.Trace().Msg("Set interface options 1 power up settings failed") return } + c.log.Trace().Msg("Setting interface options 1") if !c.setInterfaceOptions1(ctx, ch, requestContext, cbusOptions) { c.log.Trace().Msg("Set interface options 1 failed") return diff --git a/plc4go/internal/cbus/Connection_test.go b/plc4go/internal/cbus/Connection_test.go index 10f2bf973a..ebe5b4caa7 100644 --- a/plc4go/internal/cbus/Connection_test.go +++ b/plc4go/internal/cbus/Connection_test.go @@ -918,7 +918,7 @@ func TestConnection_sendReset(t *testing.T) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) t.Cleanup(cancelFunc) }, wantOk: false, @@ -1575,7 +1575,8 @@ func TestConnection_setupConnection(t *testing.T) { // Build the message codec transport := test.NewTransport(_options...) - ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, map[string][]string{"simulatedLatency": {"10ms"}}, _options...) + ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, map[string][]string{"simulatedLatency": {"1ms"}}, _options...) + require.NoError(t, err) type MockState uint8 const ( @@ -1816,7 +1817,10 @@ func TestNewConnection(t *testing.T) { _options := testutils.EnrichOptionsWithOptionsForTesting(t) transport := test.NewTransport(_options...) - codec := NewMessageCodec(test.NewTransportInstance(transport, _options...), _options...) + ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, map[string][]string{"simulatedLatency": {"1ms"}}, _options...) + require.NoError(t, err) + + codec := NewMessageCodec(ti, _options...) t.Cleanup(func() { assert.Error(t, codec.Disconnect()) }) diff --git a/plc4go/internal/cbus/MessageCodec_test.go b/plc4go/internal/cbus/MessageCodec_test.go index e45bcac843..cf6d7b66cd 100644 --- a/plc4go/internal/cbus/MessageCodec_test.go +++ b/plc4go/internal/cbus/MessageCodec_test.go @@ -22,7 +22,9 @@ package cbus import ( "context" "fmt" + "net/url" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -75,8 +77,12 @@ func TestMessageCodec_Send(t *testing.T) { _options := testutils.EnrichOptionsWithOptionsForTesting(t) transport := test.NewTransport(_options...) - instance := test.NewTransportInstance(transport, _options...) - codec := NewMessageCodec(instance, _options...) + ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, map[string][]string{"simulatedLatency": {"1ms"}}, _options...) + require.NoError(t, err) + + require.NoError(t, ti.Connect(t.Context())) + ti.(*test.TransportInstance).FillReadBuffer([]byte("@A62120\r@A62120\r")) + codec := NewMessageCodec(ti, _options...) require.NoError(t, codec.Connect(t.Context())) t.Cleanup(func() { assert.NoError(t, codec.Disconnect()) @@ -121,8 +127,8 @@ func TestMessageCodec_Receive(t *testing.T) { name string fields fields args args - setup func(t *testing.T, fields *fields) - manipulator func(t *testing.T, messageCodec *MessageCodec) + setup func(*testing.T, *fields, *args) + manipulator func(*testing.T, *MessageCodec) want spi.Message wantErr assert.ErrorAssertionFunc }{ @@ -137,17 +143,22 @@ func TestMessageCodec_Receive(t *testing.T) { args: args{ ctx: t.Context(), }, - setup: func(t *testing.T, fields *fields) { + setup: func(t *testing.T, fields *fields, args *args) { _options := testutils.EnrichOptionsWithOptionsForTesting(t) transport := test.NewTransport(_options...) - instance := test.NewTransportInstance(transport, _options...) - require.NoError(t, instance.Connect(t.Context())) - codec := NewMessageCodec(instance, _options...) + ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, map[string][]string{"simulatedLatency": {"1ms"}}, _options...) + require.NoError(t, err) + + require.NoError(t, ti.Connect(t.Context())) + codec := NewMessageCodec(ti, _options...) t.Cleanup(func() { assert.Error(t, codec.Disconnect()) }) fields.DefaultCodec = codec + var cancelFunc context.CancelFunc + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 10*time.Millisecond) + t.Cleanup(cancelFunc) }, wantErr: assert.NoError, }, @@ -167,14 +178,16 @@ func TestMessageCodec_Receive(t *testing.T) { 33, ), ), - setup: func(t *testing.T, fields *fields) { + setup: func(t *testing.T, fields *fields, args *args) { _options := testutils.EnrichOptionsWithOptionsForTesting(t) transport := test.NewTransport(_options...) - instance := test.NewTransportInstance(transport, _options...) - require.NoError(t, instance.Connect(t.Context())) - instance.FillReadBuffer([]byte("!")) - codec := NewMessageCodec(instance, _options...) + ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, map[string][]string{"simulatedLatency": {"1ms"}}, _options...) + require.NoError(t, err) + + require.NoError(t, ti.Connect(t.Context())) + ti.(*test.TransportInstance).FillReadBuffer([]byte("!")) + codec := NewMessageCodec(ti, _options...) t.Cleanup(func() { assert.Error(t, codec.Disconnect()) }) @@ -193,14 +206,16 @@ func TestMessageCodec_Receive(t *testing.T) { args: args{ ctx: t.Context(), }, - setup: func(t *testing.T, fields *fields) { + setup: func(t *testing.T, fields *fields, args *args) { _options := testutils.EnrichOptionsWithOptionsForTesting(t) transport := test.NewTransport(_options...) - instance := test.NewTransportInstance(transport, _options...) - require.NoError(t, instance.Connect(t.Context())) - instance.FillReadBuffer([]byte("@A62120\r@A62120\r")) - codec := NewMessageCodec(instance, _options...) + ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, map[string][]string{"simulatedLatency": {"1ms"}}, _options...) + require.NoError(t, err) + + require.NoError(t, ti.Connect(t.Context())) + ti.(*test.TransportInstance).FillReadBuffer([]byte("@A62120\r@A62120\r")) + codec := NewMessageCodec(ti, _options...) t.Cleanup(func() { assert.Error(t, codec.Disconnect()) }) @@ -219,14 +234,17 @@ func TestMessageCodec_Receive(t *testing.T) { args: args{ ctx: t.Context(), }, - setup: func(t *testing.T, fields *fields) { + setup: func(t *testing.T, fields *fields, args *args) { _options := testutils.EnrichOptionsWithOptionsForTesting(t) transport := test.NewTransport(_options...) - instance := test.NewTransportInstance(transport, _options...) - require.NoError(t, instance.Connect(t.Context())) - instance.FillReadBuffer([]byte("what on earth\n\r")) - codec := NewMessageCodec(instance, _options...) + ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, map[string][]string{"simulatedLatency": {"1ms"}}, _options...) + require.NoError(t, err) + + require.NoError(t, ti.Connect(t.Context())) + ti.(*test.TransportInstance).FillReadBuffer([]byte("what on earth\n\r")) + codec := NewMessageCodec(ti, _options...) + t.Cleanup(func() { assert.Error(t, codec.Disconnect()) }) @@ -245,14 +263,17 @@ func TestMessageCodec_Receive(t *testing.T) { args: args{ ctx: t.Context(), }, - setup: func(t *testing.T, fields *fields) { + setup: func(t *testing.T, fields *fields, args *args) { _options := testutils.EnrichOptionsWithOptionsForTesting(t) transport := test.NewTransport(_options...) - instance := test.NewTransportInstance(transport, _options...) - require.NoError(t, instance.Connect(t.Context())) - instance.FillReadBuffer([]byte("AFFE!!!\r")) - codec := NewMessageCodec(instance, _options...) + ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, map[string][]string{"simulatedLatency": {"1ms"}}, _options...) + require.NoError(t, err) + + require.NoError(t, ti.Connect(t.Context())) + ti.(*test.TransportInstance).FillReadBuffer([]byte("AFFE!!!\r")) + codec := NewMessageCodec(ti, _options...) + t.Cleanup(func() { assert.Error(t, codec.Disconnect()) }) @@ -300,14 +321,16 @@ func TestMessageCodec_Receive(t *testing.T) { nil, ), ), - setup: func(t *testing.T, fields *fields) { + setup: func(t *testing.T, fields *fields, args *args) { _options := testutils.EnrichOptionsWithOptionsForTesting(t) transport := test.NewTransport(_options...) - instance := test.NewTransportInstance(transport, _options...) - require.NoError(t, instance.Connect(t.Context())) - instance.FillReadBuffer([]byte("@1A2001!!!\r")) - codec := NewMessageCodec(instance, _options...) + ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, map[string][]string{"simulatedLatency": {"1ms"}}, _options...) + require.NoError(t, err) + + require.NoError(t, ti.Connect(t.Context())) + ti.(*test.TransportInstance).FillReadBuffer([]byte("@1A2001!!!\r")) + codec := NewMessageCodec(ti, _options...) t.Cleanup(func() { assert.Error(t, codec.Disconnect()) }) @@ -326,14 +349,16 @@ func TestMessageCodec_Receive(t *testing.T) { args: args{ ctx: t.Context(), }, - setup: func(t *testing.T, fields *fields) { + setup: func(t *testing.T, fields *fields, args *args) { _options := testutils.EnrichOptionsWithOptionsForTesting(t) transport := test.NewTransport(_options...) - instance := test.NewTransportInstance(transport, _options...) - require.NoError(t, instance.Connect(t.Context())) - instance.FillReadBuffer([]byte("86040200F940380001000000000000000008000000000000000000000000FA\r\n")) - codec := NewMessageCodec(instance, _options...) + ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, map[string][]string{"simulatedLatency": {"1ms"}}, _options...) + require.NoError(t, err) + + require.NoError(t, ti.Connect(t.Context())) + ti.(*test.TransportInstance).FillReadBuffer([]byte("86040200F940380001000000000000000008000000000000000000000000FA\r\n")) + codec := NewMessageCodec(ti, _options...) t.Cleanup(func() { assert.Error(t, codec.Disconnect()) }) @@ -527,14 +552,16 @@ func TestMessageCodec_Receive(t *testing.T) { messageCodec.hashEncountered.Store(9999) messageCodec.currentlyReportedServerErrors.Store(9999) }, - setup: func(t *testing.T, fields *fields) { + setup: func(t *testing.T, fields *fields, args *args) { _options := testutils.EnrichOptionsWithOptionsForTesting(t) transport := test.NewTransport(_options...) - instance := test.NewTransportInstance(transport, _options...) - require.NoError(t, instance.Connect(t.Context())) - instance.FillReadBuffer([]byte("0531AC0079042F0401430316000011\r\n")) - codec := NewMessageCodec(instance, _options...) + ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, map[string][]string{"simulatedLatency": {"1ms"}}, _options...) + require.NoError(t, err) + + require.NoError(t, ti.Connect(t.Context())) + ti.(*test.TransportInstance).FillReadBuffer([]byte("0531AC0079042F0401430316000011\r\n")) + codec := NewMessageCodec(ti, _options...) t.Cleanup(func() { assert.Error(t, codec.Disconnect()) }) @@ -589,7 +616,7 @@ func TestMessageCodec_Receive(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { if tt.setup != nil { - tt.setup(t, &tt.fields) + tt.setup(t, &tt.fields, &tt.args) } m := &MessageCodec{ DefaultCodec: tt.fields.DefaultCodec, @@ -615,33 +642,39 @@ func TestMessageCodec_Receive_Delayed_Response(t *testing.T) { _options := testutils.EnrichOptionsWithOptionsForTesting(t) transport := test.NewTransport(_options...) - transportInstance := test.NewTransportInstance(transport, _options...) - require.NoError(t, transportInstance.Connect(t.Context())) - codec := NewMessageCodec(transportInstance, _options...) + ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, map[string][]string{"simulatedLatency": {"1ms"}}, _options...) + require.NoError(t, err) + require.NoError(t, ti.Connect(t.Context())) + codec := NewMessageCodec(ti, _options...) t.Cleanup(func() { assert.Error(t, codec.Disconnect()) }) codec.requestContext = readWriteModel.NewRequestContext(true) + timeoutCtx := func(timeout time.Duration) context.Context { + withTimeout, cancelFunc := context.WithTimeout(t.Context(), timeout) + t.Cleanup(cancelFunc) + return withTimeout + } + var msg spi.Message - var err error - msg, err = codec.Receive(t.Context()) + msg, err = codec.Receive(timeoutCtx(1 * time.Second)) // No data yet so this should return no error and no data assert.NoError(t, err) assert.Nil(t, msg) // Now we add a confirmation - transportInstance.FillReadBuffer([]byte("i.")) + ti.(*test.TransportInstance).FillReadBuffer([]byte("i.")) // We should wait for more data, so no error, no message - msg, err = codec.Receive(t.Context()) + msg, err = codec.Receive(timeoutCtx(1 * time.Second)) assert.NoError(t, err) assert.Nil(t, msg) // Now we fill in the payload - transportInstance.FillReadBuffer([]byte("86FD0201078900434C495053414C20C2\r\n")) + ti.(*test.TransportInstance).FillReadBuffer([]byte("86FD0201078900434C495053414C20C2\r\n")) - // We should wait for more data, so no error, no message - msg, err = codec.Receive(t.Context()) + // We should wait for more data, so no error + msg, err = codec.Receive(timeoutCtx(2 * time.Second)) assert.NoError(t, err) require.NotNil(t, msg) @@ -652,22 +685,23 @@ func TestMessageCodec_Receive_Delayed_Response(t *testing.T) { _options := testutils.EnrichOptionsWithOptionsForTesting(t) transport := test.NewTransport(_options...) - transportInstance := test.NewTransportInstance(transport, _options...) - require.NoError(t, transportInstance.Connect(t.Context())) - codec := NewMessageCodec(transportInstance, _options...) + ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, map[string][]string{"simulatedLatency": {"1ms"}}, _options...) + require.NoError(t, err) + + require.NoError(t, ti.Connect(t.Context())) + codec := NewMessageCodec(ti, _options...) t.Cleanup(func() { assert.Error(t, codec.Disconnect()) }) codec.requestContext = readWriteModel.NewRequestContext(true) var msg spi.Message - var err error msg, err = codec.Receive(t.Context()) // No data yet so this should return no error and no data assert.NoError(t, err) assert.Nil(t, msg) // Now we add a confirmation - transportInstance.FillReadBuffer([]byte("i.")) + ti.(*test.TransportInstance).FillReadBuffer([]byte("i.")) for i := 0; i < 8; i++ { t.Logf("%d try", i+1) @@ -678,7 +712,7 @@ func TestMessageCodec_Receive_Delayed_Response(t *testing.T) { } // Now we fill in the payload - transportInstance.FillReadBuffer([]byte("86FD0201078900434C495053414C20C2\r\n")) + ti.(*test.TransportInstance).FillReadBuffer([]byte("86FD0201078900434C495053414C20C2\r\n")) // We should wait for more data, so no error, no message msg, err = codec.Receive(t.Context()) @@ -692,22 +726,24 @@ func TestMessageCodec_Receive_Delayed_Response(t *testing.T) { _options := testutils.EnrichOptionsWithOptionsForTesting(t) transport := test.NewTransport(_options...) - transportInstance := test.NewTransportInstance(transport, _options...) - require.NoError(t, transportInstance.Connect(t.Context())) - codec := NewMessageCodec(transportInstance, _options...) + ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, map[string][]string{"simulatedLatency": {"1ms"}}, _options...) + require.NoError(t, err) + + require.NoError(t, ti.Connect(t.Context())) + ti.(*test.TransportInstance).FillReadBuffer([]byte("@A62120\r@A62120\r")) + codec := NewMessageCodec(ti, _options...) t.Cleanup(func() { assert.Error(t, codec.Disconnect()) }) codec.requestContext = readWriteModel.NewRequestContext(true) var msg spi.Message - var err error msg, err = codec.Receive(t.Context()) // No data yet so this should return no error and no data assert.NoError(t, err) assert.Nil(t, msg) // Now we add a confirmation - transportInstance.FillReadBuffer([]byte("i.")) + ti.(*test.TransportInstance).FillReadBuffer([]byte("i.")) for i := 0; i <= 15; i++ { t.Logf("%d try", i+1) @@ -728,7 +764,7 @@ func TestMessageCodec_Receive_Delayed_Response(t *testing.T) { } // Now we fill in the payload - transportInstance.FillReadBuffer([]byte("86FD0201078900434C495053414C20C2\r\n")) + ti.(*test.TransportInstance).FillReadBuffer([]byte("86FD0201078900434C495053414C20C2\r\n")) // We should wait for more data, so no error, no message msg, err = codec.Receive(t.Context())
