This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit fdf8e842b42e858795c5e08439b1d71d8ff3bd25
Author: Sebastian Rühl <[email protected]>
AuthorDate: Thu Nov 20 14:01:53 2025 +0100

    fix(plc4go/spi): TransportInstance should stop on ctx abort
---
 plc4go/internal/cbus/Browser_test.go            | 3 +--
 plc4go/internal/cbus/MessageCodec.go            | 7 +++++--
 plc4go/spi/transports/test/TransportInstance.go | 2 ++
 3 files changed, 8 insertions(+), 4 deletions(-)

diff --git a/plc4go/internal/cbus/Browser_test.go 
b/plc4go/internal/cbus/Browser_test.go
index 860463a1af..f3f7443d99 100644
--- a/plc4go/internal/cbus/Browser_test.go
+++ b/plc4go/internal/cbus/Browser_test.go
@@ -519,7 +519,7 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) 
{
 
                                transport := test.NewTransport(_options...)
                                transportUrl := url.URL{Scheme: "test"}
-                               transportInstance, err := 
transport.CreateTransportInstance(transportUrl, 
map[string][]string{"simulatedLatency": {"10ms"}}, _options...)
+                               transportInstance, err := 
transport.CreateTransportInstance(transportUrl, 
map[string][]string{"simulatedLatency": {"0ms"}}, _options...)
                                require.NoError(t, err)
                                t.Cleanup(func() {
                                        t.Log(transportInstance.Close())
@@ -629,7 +629,6 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) 
{
                                        return
                                }
                                assert.Equalf(t, tt.want, got, 
"getInstalledUnitAddressBytes(%v)", tt.args.ctx)
-                               t.Log("Banananaaaa")
                        })
                })
        }
diff --git a/plc4go/internal/cbus/MessageCodec.go 
b/plc4go/internal/cbus/MessageCodec.go
index 55783800ff..10b65ea2c8 100644
--- a/plc4go/internal/cbus/MessageCodec.go
+++ b/plc4go/internal/cbus/MessageCodec.go
@@ -134,7 +134,7 @@ func (m *MessageCodec) Receive(ctx context.Context) 
(spi.Message, error) {
        // Fill the buffer
        {
                fillCtx, fillCtxCancel := context.WithTimeout(ctx, 
100*time.Millisecond)
-               if err := ti.FillBuffer(fillCtx, func(pos uint, currentByte 
byte, reader transports.ExtendedReader) bool {
+               if err := ti.FillBuffer(fillCtx, func(pos uint, currentByte 
byte, reader transports.ExtendedReader) (keepGoing bool) {
                        switch currentByte {
                        case
                                readWriteModel.ResponseTermination_CR,
@@ -144,7 +144,10 @@ func (m *MessageCodec) Receive(ctx context.Context) 
(spi.Message, error) {
                                confirmation = true
                                // In case we have directly more data in the 
buffer after a confirmation
                                _, err := reader.Peek(int(pos + 1))
-                               return err == nil
+                               if err != nil {
+                                       return false
+                               }
+                               return true
                        case
                                
byte(readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS),
                                
byte(readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION),
diff --git a/plc4go/spi/transports/test/TransportInstance.go 
b/plc4go/spi/transports/test/TransportInstance.go
index ffc21b3997..70a39b504f 100644
--- a/plc4go/spi/transports/test/TransportInstance.go
+++ b/plc4go/spi/transports/test/TransportInstance.go
@@ -166,6 +166,8 @@ func (m *TransportInstance) FillBuffer(ctx context.Context, 
until func(pos uint,
                timer := time.NewTimer(m.simulatedLatency)
                select {
                case <-ctx.Done():
+                       m.log.Trace().Msg("Context done")
+                       return ctx.Err()
                case <-timer.C:
                }
                m.log.Trace().Uint32("nBytes", nBytes).Msg("Peeking bytes")

Reply via email to