This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 68e9b54b49cbff845a555f7c49d8b8fcc8ccf643
Author: Sebastian Rühl <[email protected]>
AuthorDate: Thu Nov 13 16:09:18 2025 +0100

    test(plc4go): fix unstable tests
---
 plc4go/internal/cbus/Browser_test.go      |   1 +
 plc4go/internal/cbus/Connection.go        |   6 ++
 plc4go/internal/cbus/Connection_test.go   |  10 +-
 plc4go/internal/cbus/MessageCodec_test.go | 168 ++++++++++++++++++------------
 4 files changed, 116 insertions(+), 69 deletions(-)

diff --git a/plc4go/internal/cbus/Browser_test.go 
b/plc4go/internal/cbus/Browser_test.go
index fe6b731dd0..aeda64d9d6 100644
--- a/plc4go/internal/cbus/Browser_test.go
+++ b/plc4go/internal/cbus/Browser_test.go
@@ -589,6 +589,7 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) 
{
                                require.NoError(t, 
connectionConnectResult.GetErr())
                                fields.connection = 
connectionConnectResult.GetConnection()
                                t.Cleanup(func() {
+                                       t.Log("shutting down connection")
                                        
t.Log(fields.connection.BlockingClose(t.Context()))
                                })
 
diff --git a/plc4go/internal/cbus/Connection.go 
b/plc4go/internal/cbus/Connection.go
index d384d1ced2..d7d0a76b0f 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -234,6 +234,8 @@ func (c *Connection) setupConnection(ctx context.Context, 
ch chan plc4go.PlcConn
        cbusOptions := &c.messageCodec.cbusOptions
        requestContext := &c.messageCodec.requestContext
 
+       c.log.Trace().Msg("Starting connection setup")
+       c.log.Trace().Msg("Sending reset")
        if !c.sendReset(ctx, ch, false) {
                c.log.Warn().Msg("First reset failed")
                // We try a second reset in case we get a power up
@@ -242,18 +244,22 @@ func (c *Connection) setupConnection(ctx context.Context, 
ch chan plc4go.PlcConn
                        return
                }
        }
+       c.log.Trace().Msg("setting application filter")
        if !c.setApplicationFilter(ctx, ch, requestContext, cbusOptions) {
                c.log.Trace().Msg("Set application filter failed")
                return
        }
+       c.log.Trace().Msg("Setting interface options 3")
        if !c.setInterfaceOptions3(ctx, ch, requestContext, cbusOptions) {
                c.log.Trace().Msg("Set interface options 3 failed")
                return
        }
+       c.log.Trace().Msg("Setting interface options 1 power up settings")
        if !c.setInterface1PowerUpSettings(ctx, ch, requestContext, 
cbusOptions) {
                c.log.Trace().Msg("Set interface options 1 power up settings 
failed")
                return
        }
+       c.log.Trace().Msg("Setting interface options 1")
        if !c.setInterfaceOptions1(ctx, ch, requestContext, cbusOptions) {
                c.log.Trace().Msg("Set interface options 1 failed")
                return
diff --git a/plc4go/internal/cbus/Connection_test.go 
b/plc4go/internal/cbus/Connection_test.go
index 10f2bf973a..ebe5b4caa7 100644
--- a/plc4go/internal/cbus/Connection_test.go
+++ b/plc4go/internal/cbus/Connection_test.go
@@ -918,7 +918,7 @@ func TestConnection_sendReset(t *testing.T) {
 
                                args.ctx = testutils.TestContext(t)
                                var cancelFunc context.CancelFunc
-                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 20*time.Second)
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 2*time.Second)
                                t.Cleanup(cancelFunc)
                        },
                        wantOk: false,
@@ -1575,7 +1575,8 @@ func TestConnection_setupConnection(t *testing.T) {
 
                                // Build the message codec
                                transport := test.NewTransport(_options...)
-                               ti, err := 
transport.CreateTransportInstance(url.URL{Scheme: "test"}, 
map[string][]string{"simulatedLatency": {"10ms"}}, _options...)
+                               ti, err := 
transport.CreateTransportInstance(url.URL{Scheme: "test"}, 
map[string][]string{"simulatedLatency": {"1ms"}}, _options...)
+                               require.NoError(t, err)
 
                                type MockState uint8
                                const (
@@ -1816,7 +1817,10 @@ func TestNewConnection(t *testing.T) {
                                _options := 
testutils.EnrichOptionsWithOptionsForTesting(t)
 
                                transport := test.NewTransport(_options...)
-                               codec := 
NewMessageCodec(test.NewTransportInstance(transport, _options...), _options...)
+                               ti, err := 
transport.CreateTransportInstance(url.URL{Scheme: "test"}, 
map[string][]string{"simulatedLatency": {"1ms"}}, _options...)
+                               require.NoError(t, err)
+
+                               codec := NewMessageCodec(ti, _options...)
                                t.Cleanup(func() {
                                        assert.Error(t, codec.Disconnect())
                                })
diff --git a/plc4go/internal/cbus/MessageCodec_test.go 
b/plc4go/internal/cbus/MessageCodec_test.go
index e45bcac843..cf6d7b66cd 100644
--- a/plc4go/internal/cbus/MessageCodec_test.go
+++ b/plc4go/internal/cbus/MessageCodec_test.go
@@ -22,7 +22,9 @@ package cbus
 import (
        "context"
        "fmt"
+       "net/url"
        "testing"
+       "time"
 
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
@@ -75,8 +77,12 @@ func TestMessageCodec_Send(t *testing.T) {
                                _options := 
testutils.EnrichOptionsWithOptionsForTesting(t)
 
                                transport := test.NewTransport(_options...)
-                               instance := 
test.NewTransportInstance(transport, _options...)
-                               codec := NewMessageCodec(instance, _options...)
+                               ti, err := 
transport.CreateTransportInstance(url.URL{Scheme: "test"}, 
map[string][]string{"simulatedLatency": {"1ms"}}, _options...)
+                               require.NoError(t, err)
+
+                               require.NoError(t, ti.Connect(t.Context()))
+                               
ti.(*test.TransportInstance).FillReadBuffer([]byte("@A62120\r@A62120\r"))
+                               codec := NewMessageCodec(ti, _options...)
                                require.NoError(t, codec.Connect(t.Context()))
                                t.Cleanup(func() {
                                        assert.NoError(t, codec.Disconnect())
@@ -121,8 +127,8 @@ func TestMessageCodec_Receive(t *testing.T) {
                name        string
                fields      fields
                args        args
-               setup       func(t *testing.T, fields *fields)
-               manipulator func(t *testing.T, messageCodec *MessageCodec)
+               setup       func(*testing.T, *fields, *args)
+               manipulator func(*testing.T, *MessageCodec)
                want        spi.Message
                wantErr     assert.ErrorAssertionFunc
        }{
@@ -137,17 +143,22 @@ func TestMessageCodec_Receive(t *testing.T) {
                        args: args{
                                ctx: t.Context(),
                        },
-                       setup: func(t *testing.T, fields *fields) {
+                       setup: func(t *testing.T, fields *fields, args *args) {
                                _options := 
testutils.EnrichOptionsWithOptionsForTesting(t)
 
                                transport := test.NewTransport(_options...)
-                               instance := 
test.NewTransportInstance(transport, _options...)
-                               require.NoError(t, 
instance.Connect(t.Context()))
-                               codec := NewMessageCodec(instance, _options...)
+                               ti, err := 
transport.CreateTransportInstance(url.URL{Scheme: "test"}, 
map[string][]string{"simulatedLatency": {"1ms"}}, _options...)
+                               require.NoError(t, err)
+
+                               require.NoError(t, ti.Connect(t.Context()))
+                               codec := NewMessageCodec(ti, _options...)
                                t.Cleanup(func() {
                                        assert.Error(t, codec.Disconnect())
                                })
                                fields.DefaultCodec = codec
+                               var cancelFunc context.CancelFunc
+                               args.ctx, cancelFunc = 
context.WithTimeout(args.ctx, 10*time.Millisecond)
+                               t.Cleanup(cancelFunc)
                        },
                        wantErr: assert.NoError,
                },
@@ -167,14 +178,16 @@ func TestMessageCodec_Receive(t *testing.T) {
                                        33,
                                ),
                        ),
-                       setup: func(t *testing.T, fields *fields) {
+                       setup: func(t *testing.T, fields *fields, args *args) {
                                _options := 
testutils.EnrichOptionsWithOptionsForTesting(t)
 
                                transport := test.NewTransport(_options...)
-                               instance := 
test.NewTransportInstance(transport, _options...)
-                               require.NoError(t, 
instance.Connect(t.Context()))
-                               instance.FillReadBuffer([]byte("!"))
-                               codec := NewMessageCodec(instance, _options...)
+                               ti, err := 
transport.CreateTransportInstance(url.URL{Scheme: "test"}, 
map[string][]string{"simulatedLatency": {"1ms"}}, _options...)
+                               require.NoError(t, err)
+
+                               require.NoError(t, ti.Connect(t.Context()))
+                               
ti.(*test.TransportInstance).FillReadBuffer([]byte("!"))
+                               codec := NewMessageCodec(ti, _options...)
                                t.Cleanup(func() {
                                        assert.Error(t, codec.Disconnect())
                                })
@@ -193,14 +206,16 @@ func TestMessageCodec_Receive(t *testing.T) {
                        args: args{
                                ctx: t.Context(),
                        },
-                       setup: func(t *testing.T, fields *fields) {
+                       setup: func(t *testing.T, fields *fields, args *args) {
                                _options := 
testutils.EnrichOptionsWithOptionsForTesting(t)
 
                                transport := test.NewTransport(_options...)
-                               instance := 
test.NewTransportInstance(transport, _options...)
-                               require.NoError(t, 
instance.Connect(t.Context()))
-                               
instance.FillReadBuffer([]byte("@A62120\r@A62120\r"))
-                               codec := NewMessageCodec(instance, _options...)
+                               ti, err := 
transport.CreateTransportInstance(url.URL{Scheme: "test"}, 
map[string][]string{"simulatedLatency": {"1ms"}}, _options...)
+                               require.NoError(t, err)
+
+                               require.NoError(t, ti.Connect(t.Context()))
+                               
ti.(*test.TransportInstance).FillReadBuffer([]byte("@A62120\r@A62120\r"))
+                               codec := NewMessageCodec(ti, _options...)
                                t.Cleanup(func() {
                                        assert.Error(t, codec.Disconnect())
                                })
@@ -219,14 +234,17 @@ func TestMessageCodec_Receive(t *testing.T) {
                        args: args{
                                ctx: t.Context(),
                        },
-                       setup: func(t *testing.T, fields *fields) {
+                       setup: func(t *testing.T, fields *fields, args *args) {
                                _options := 
testutils.EnrichOptionsWithOptionsForTesting(t)
 
                                transport := test.NewTransport(_options...)
-                               instance := 
test.NewTransportInstance(transport, _options...)
-                               require.NoError(t, 
instance.Connect(t.Context()))
-                               instance.FillReadBuffer([]byte("what on 
earth\n\r"))
-                               codec := NewMessageCodec(instance, _options...)
+                               ti, err := 
transport.CreateTransportInstance(url.URL{Scheme: "test"}, 
map[string][]string{"simulatedLatency": {"1ms"}}, _options...)
+                               require.NoError(t, err)
+
+                               require.NoError(t, ti.Connect(t.Context()))
+                               
ti.(*test.TransportInstance).FillReadBuffer([]byte("what on earth\n\r"))
+                               codec := NewMessageCodec(ti, _options...)
+
                                t.Cleanup(func() {
                                        assert.Error(t, codec.Disconnect())
                                })
@@ -245,14 +263,17 @@ func TestMessageCodec_Receive(t *testing.T) {
                        args: args{
                                ctx: t.Context(),
                        },
-                       setup: func(t *testing.T, fields *fields) {
+                       setup: func(t *testing.T, fields *fields, args *args) {
                                _options := 
testutils.EnrichOptionsWithOptionsForTesting(t)
 
                                transport := test.NewTransport(_options...)
-                               instance := 
test.NewTransportInstance(transport, _options...)
-                               require.NoError(t, 
instance.Connect(t.Context()))
-                               instance.FillReadBuffer([]byte("AFFE!!!\r"))
-                               codec := NewMessageCodec(instance, _options...)
+                               ti, err := 
transport.CreateTransportInstance(url.URL{Scheme: "test"}, 
map[string][]string{"simulatedLatency": {"1ms"}}, _options...)
+                               require.NoError(t, err)
+
+                               require.NoError(t, ti.Connect(t.Context()))
+                               
ti.(*test.TransportInstance).FillReadBuffer([]byte("AFFE!!!\r"))
+                               codec := NewMessageCodec(ti, _options...)
+
                                t.Cleanup(func() {
                                        assert.Error(t, codec.Disconnect())
                                })
@@ -300,14 +321,16 @@ func TestMessageCodec_Receive(t *testing.T) {
                                        nil,
                                ),
                        ),
-                       setup: func(t *testing.T, fields *fields) {
+                       setup: func(t *testing.T, fields *fields, args *args) {
                                _options := 
testutils.EnrichOptionsWithOptionsForTesting(t)
 
                                transport := test.NewTransport(_options...)
-                               instance := 
test.NewTransportInstance(transport, _options...)
-                               require.NoError(t, 
instance.Connect(t.Context()))
-                               instance.FillReadBuffer([]byte("@1A2001!!!\r"))
-                               codec := NewMessageCodec(instance, _options...)
+                               ti, err := 
transport.CreateTransportInstance(url.URL{Scheme: "test"}, 
map[string][]string{"simulatedLatency": {"1ms"}}, _options...)
+                               require.NoError(t, err)
+
+                               require.NoError(t, ti.Connect(t.Context()))
+                               
ti.(*test.TransportInstance).FillReadBuffer([]byte("@1A2001!!!\r"))
+                               codec := NewMessageCodec(ti, _options...)
                                t.Cleanup(func() {
                                        assert.Error(t, codec.Disconnect())
                                })
@@ -326,14 +349,16 @@ func TestMessageCodec_Receive(t *testing.T) {
                        args: args{
                                ctx: t.Context(),
                        },
-                       setup: func(t *testing.T, fields *fields) {
+                       setup: func(t *testing.T, fields *fields, args *args) {
                                _options := 
testutils.EnrichOptionsWithOptionsForTesting(t)
 
                                transport := test.NewTransport(_options...)
-                               instance := 
test.NewTransportInstance(transport, _options...)
-                               require.NoError(t, 
instance.Connect(t.Context()))
-                               
instance.FillReadBuffer([]byte("86040200F940380001000000000000000008000000000000000000000000FA\r\n"))
-                               codec := NewMessageCodec(instance, _options...)
+                               ti, err := 
transport.CreateTransportInstance(url.URL{Scheme: "test"}, 
map[string][]string{"simulatedLatency": {"1ms"}}, _options...)
+                               require.NoError(t, err)
+
+                               require.NoError(t, ti.Connect(t.Context()))
+                               
ti.(*test.TransportInstance).FillReadBuffer([]byte("86040200F940380001000000000000000008000000000000000000000000FA\r\n"))
+                               codec := NewMessageCodec(ti, _options...)
                                t.Cleanup(func() {
                                        assert.Error(t, codec.Disconnect())
                                })
@@ -527,14 +552,16 @@ func TestMessageCodec_Receive(t *testing.T) {
                                messageCodec.hashEncountered.Store(9999)
                                
messageCodec.currentlyReportedServerErrors.Store(9999)
                        },
-                       setup: func(t *testing.T, fields *fields) {
+                       setup: func(t *testing.T, fields *fields, args *args) {
                                _options := 
testutils.EnrichOptionsWithOptionsForTesting(t)
 
                                transport := test.NewTransport(_options...)
-                               instance := 
test.NewTransportInstance(transport, _options...)
-                               require.NoError(t, 
instance.Connect(t.Context()))
-                               
instance.FillReadBuffer([]byte("0531AC0079042F0401430316000011\r\n"))
-                               codec := NewMessageCodec(instance, _options...)
+                               ti, err := 
transport.CreateTransportInstance(url.URL{Scheme: "test"}, 
map[string][]string{"simulatedLatency": {"1ms"}}, _options...)
+                               require.NoError(t, err)
+
+                               require.NoError(t, ti.Connect(t.Context()))
+                               
ti.(*test.TransportInstance).FillReadBuffer([]byte("0531AC0079042F0401430316000011\r\n"))
+                               codec := NewMessageCodec(ti, _options...)
                                t.Cleanup(func() {
                                        assert.Error(t, codec.Disconnect())
                                })
@@ -589,7 +616,7 @@ func TestMessageCodec_Receive(t *testing.T) {
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
                        if tt.setup != nil {
-                               tt.setup(t, &tt.fields)
+                               tt.setup(t, &tt.fields, &tt.args)
                        }
                        m := &MessageCodec{
                                DefaultCodec:   tt.fields.DefaultCodec,
@@ -615,33 +642,39 @@ func TestMessageCodec_Receive_Delayed_Response(t 
*testing.T) {
                _options := testutils.EnrichOptionsWithOptionsForTesting(t)
 
                transport := test.NewTransport(_options...)
-               transportInstance := test.NewTransportInstance(transport, 
_options...)
-               require.NoError(t, transportInstance.Connect(t.Context()))
-               codec := NewMessageCodec(transportInstance, _options...)
+               ti, err := transport.CreateTransportInstance(url.URL{Scheme: 
"test"}, map[string][]string{"simulatedLatency": {"1ms"}}, _options...)
+               require.NoError(t, err)
+               require.NoError(t, ti.Connect(t.Context()))
+               codec := NewMessageCodec(ti, _options...)
                t.Cleanup(func() {
                        assert.Error(t, codec.Disconnect())
                })
                codec.requestContext = readWriteModel.NewRequestContext(true)
 
+               timeoutCtx := func(timeout time.Duration) context.Context {
+                       withTimeout, cancelFunc := 
context.WithTimeout(t.Context(), timeout)
+                       t.Cleanup(cancelFunc)
+                       return withTimeout
+               }
+
                var msg spi.Message
-               var err error
-               msg, err = codec.Receive(t.Context())
+               msg, err = codec.Receive(timeoutCtx(1 * time.Second))
                // No data yet so this should return no error and no data
                assert.NoError(t, err)
                assert.Nil(t, msg)
                // Now we add a confirmation
-               transportInstance.FillReadBuffer([]byte("i."))
+               ti.(*test.TransportInstance).FillReadBuffer([]byte("i."))
 
                // We should wait for more data, so no error, no message
-               msg, err = codec.Receive(t.Context())
+               msg, err = codec.Receive(timeoutCtx(1 * time.Second))
                assert.NoError(t, err)
                assert.Nil(t, msg)
 
                // Now we fill in the payload
-               
transportInstance.FillReadBuffer([]byte("86FD0201078900434C495053414C20C2\r\n"))
+               
ti.(*test.TransportInstance).FillReadBuffer([]byte("86FD0201078900434C495053414C20C2\r\n"))
 
-               // We should wait for more data, so no error, no message
-               msg, err = codec.Receive(t.Context())
+               // We should wait for more data, so no error
+               msg, err = codec.Receive(timeoutCtx(2 * time.Second))
                assert.NoError(t, err)
                require.NotNil(t, msg)
 
@@ -652,22 +685,23 @@ func TestMessageCodec_Receive_Delayed_Response(t 
*testing.T) {
                _options := testutils.EnrichOptionsWithOptionsForTesting(t)
 
                transport := test.NewTransport(_options...)
-               transportInstance := test.NewTransportInstance(transport, 
_options...)
-               require.NoError(t, transportInstance.Connect(t.Context()))
-               codec := NewMessageCodec(transportInstance, _options...)
+               ti, err := transport.CreateTransportInstance(url.URL{Scheme: 
"test"}, map[string][]string{"simulatedLatency": {"1ms"}}, _options...)
+               require.NoError(t, err)
+
+               require.NoError(t, ti.Connect(t.Context()))
+               codec := NewMessageCodec(ti, _options...)
                t.Cleanup(func() {
                        assert.Error(t, codec.Disconnect())
                })
                codec.requestContext = readWriteModel.NewRequestContext(true)
 
                var msg spi.Message
-               var err error
                msg, err = codec.Receive(t.Context())
                // No data yet so this should return no error and no data
                assert.NoError(t, err)
                assert.Nil(t, msg)
                // Now we add a confirmation
-               transportInstance.FillReadBuffer([]byte("i."))
+               ti.(*test.TransportInstance).FillReadBuffer([]byte("i."))
 
                for i := 0; i < 8; i++ {
                        t.Logf("%d try", i+1)
@@ -678,7 +712,7 @@ func TestMessageCodec_Receive_Delayed_Response(t 
*testing.T) {
                }
 
                // Now we fill in the payload
-               
transportInstance.FillReadBuffer([]byte("86FD0201078900434C495053414C20C2\r\n"))
+               
ti.(*test.TransportInstance).FillReadBuffer([]byte("86FD0201078900434C495053414C20C2\r\n"))
 
                // We should wait for more data, so no error, no message
                msg, err = codec.Receive(t.Context())
@@ -692,22 +726,24 @@ func TestMessageCodec_Receive_Delayed_Response(t 
*testing.T) {
                _options := testutils.EnrichOptionsWithOptionsForTesting(t)
 
                transport := test.NewTransport(_options...)
-               transportInstance := test.NewTransportInstance(transport, 
_options...)
-               require.NoError(t, transportInstance.Connect(t.Context()))
-               codec := NewMessageCodec(transportInstance, _options...)
+               ti, err := transport.CreateTransportInstance(url.URL{Scheme: 
"test"}, map[string][]string{"simulatedLatency": {"1ms"}}, _options...)
+               require.NoError(t, err)
+
+               require.NoError(t, ti.Connect(t.Context()))
+               
ti.(*test.TransportInstance).FillReadBuffer([]byte("@A62120\r@A62120\r"))
+               codec := NewMessageCodec(ti, _options...)
                t.Cleanup(func() {
                        assert.Error(t, codec.Disconnect())
                })
                codec.requestContext = readWriteModel.NewRequestContext(true)
 
                var msg spi.Message
-               var err error
                msg, err = codec.Receive(t.Context())
                // No data yet so this should return no error and no data
                assert.NoError(t, err)
                assert.Nil(t, msg)
                // Now we add a confirmation
-               transportInstance.FillReadBuffer([]byte("i."))
+               ti.(*test.TransportInstance).FillReadBuffer([]byte("i."))
 
                for i := 0; i <= 15; i++ {
                        t.Logf("%d try", i+1)
@@ -728,7 +764,7 @@ func TestMessageCodec_Receive_Delayed_Response(t 
*testing.T) {
                }
 
                // Now we fill in the payload
-               
transportInstance.FillReadBuffer([]byte("86FD0201078900434C495053414C20C2\r\n"))
+               
ti.(*test.TransportInstance).FillReadBuffer([]byte("86FD0201078900434C495053414C20C2\r\n"))
 
                // We should wait for more data, so no error, no message
                msg, err = codec.Receive(t.Context())

Reply via email to