This is an automated email from the ASF dual-hosted git repository. sruehl pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit fdf8e842b42e858795c5e08439b1d71d8ff3bd25 Author: Sebastian Rühl <[email protected]> AuthorDate: Thu Nov 20 14:01:53 2025 +0100 fix(plc4go/spi): TransportInstance should stop on ctx abort --- plc4go/internal/cbus/Browser_test.go | 3 +-- plc4go/internal/cbus/MessageCodec.go | 7 +++++-- plc4go/spi/transports/test/TransportInstance.go | 2 ++ 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/plc4go/internal/cbus/Browser_test.go b/plc4go/internal/cbus/Browser_test.go index 860463a1af..f3f7443d99 100644 --- a/plc4go/internal/cbus/Browser_test.go +++ b/plc4go/internal/cbus/Browser_test.go @@ -519,7 +519,7 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) { transport := test.NewTransport(_options...) transportUrl := url.URL{Scheme: "test"} - transportInstance, err := transport.CreateTransportInstance(transportUrl, map[string][]string{"simulatedLatency": {"10ms"}}, _options...) + transportInstance, err := transport.CreateTransportInstance(transportUrl, map[string][]string{"simulatedLatency": {"0ms"}}, _options...) require.NoError(t, err) t.Cleanup(func() { t.Log(transportInstance.Close()) @@ -629,7 +629,6 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) { return } assert.Equalf(t, tt.want, got, "getInstalledUnitAddressBytes(%v)", tt.args.ctx) - t.Log("Banananaaaa") }) }) } diff --git a/plc4go/internal/cbus/MessageCodec.go b/plc4go/internal/cbus/MessageCodec.go index 55783800ff..10b65ea2c8 100644 --- a/plc4go/internal/cbus/MessageCodec.go +++ b/plc4go/internal/cbus/MessageCodec.go @@ -134,7 +134,7 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { // Fill the buffer { fillCtx, fillCtxCancel := context.WithTimeout(ctx, 100*time.Millisecond) - if err := ti.FillBuffer(fillCtx, func(pos uint, currentByte byte, reader transports.ExtendedReader) bool { + if err := ti.FillBuffer(fillCtx, func(pos uint, currentByte byte, reader transports.ExtendedReader) (keepGoing bool) { switch currentByte { case readWriteModel.ResponseTermination_CR, @@ -144,7 +144,10 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { confirmation = true // In case we have directly more data in the buffer after a confirmation _, err := reader.Peek(int(pos + 1)) - return err == nil + if err != nil { + return false + } + return true case byte(readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS), byte(readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION), diff --git a/plc4go/spi/transports/test/TransportInstance.go b/plc4go/spi/transports/test/TransportInstance.go index ffc21b3997..70a39b504f 100644 --- a/plc4go/spi/transports/test/TransportInstance.go +++ b/plc4go/spi/transports/test/TransportInstance.go @@ -166,6 +166,8 @@ func (m *TransportInstance) FillBuffer(ctx context.Context, until func(pos uint, timer := time.NewTimer(m.simulatedLatency) select { case <-ctx.Done(): + m.log.Trace().Msg("Context done") + return ctx.Err() case <-timer.C: } m.log.Trace().Uint32("nBytes", nBytes).Msg("Peeking bytes")
