This is an automated email from the ASF dual-hosted git repository. sruehl pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 750dff6481157985abf314e7bf026b0aa9d38a22 Author: Sebastian Rühl <[email protected]> AuthorDate: Thu Nov 13 10:36:28 2025 +0100 test(plc4go): fixed issues with concurrency --- plc4go/internal/cbus/Browser_test.go | 6 +- plc4go/internal/cbus/Connection.go | 22 ++++--- plc4go/internal/cbus/Connection_test.go | 34 ++++------- plc4go/internal/cbus/Driver_test.go | 8 +-- plc4go/internal/cbus/MessageCodec.go | 5 +- plc4go/internal/cbus/MessageCodec_test.go | 23 +++++-- plc4go/internal/cbus/Reader_test.go | 31 +++++----- plc4go/internal/cbus/Writer_test.go | 6 +- plc4go/spi/default/DefaultCodec.go | 7 ++- plc4go/spi/default/DefaultCodec_test.go | 6 +- .../SingleItemRequestInterceptor_test.go | 12 ++-- plc4go/spi/transports/test/TransportInstance.go | 71 ++++++++++++---------- .../spi/transports/test/TransportInstance_test.go | 1 + plc4go/spi/transports/test/Transport_test.go | 35 ++++++----- .../test/transportInstanceDrivenExtendedReader.go | 4 -- 15 files changed, 147 insertions(+), 124 deletions(-) diff --git a/plc4go/internal/cbus/Browser_test.go b/plc4go/internal/cbus/Browser_test.go index 308fa89513..db63663c2d 100644 --- a/plc4go/internal/cbus/Browser_test.go +++ b/plc4go/internal/cbus/Browser_test.go @@ -326,7 +326,7 @@ func TestBrowser_browseUnitInfo(t *testing.T) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, wantResponseCode: apiModel.PlcResponseCode_OK, @@ -395,7 +395,7 @@ func TestBrowser_extractUnits(t *testing.T) { setup: func(t *testing.T, fields *fields, args *args) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, want: []readWriteModel.UnitAddress{readWriteModel.NewUnitAddress(2)}, @@ -595,7 +595,7 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, want: map[byte]any{ diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go index 5cddb6034a..203317d86b 100644 --- a/plc4go/internal/cbus/Connection.go +++ b/plc4go/internal/cbus/Connection.go @@ -234,10 +234,10 @@ func (c *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn cbusOptions := &c.messageCodec.cbusOptions requestContext := &c.messageCodec.requestContext - if !c.sendReset(ctx, ch, cbusOptions, requestContext, false) { + if !c.sendReset(ctx, ch, false) { c.log.Warn().Msg("First reset failed") // We try a second reset in case we get a power up - if !c.sendReset(ctx, ch, cbusOptions, requestContext, true) { + if !c.sendReset(ctx, ch, true) { c.log.Trace().Msg("Reset failed") return } @@ -343,7 +343,7 @@ func (c *Connection) startSubscriptionHandler() { }) } -func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult, cbusOptions *readWriteModel.CBusOptions, requestContext *readWriteModel.RequestContext, sendOutErrorNotification bool) (ok bool) { +func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult, sendOutErrorNotification bool) (ok bool) { c.log.Debug().Bool("sendOutErrorNotification", sendOutErrorNotification).Msg("Send a reset") requestTypeReset := readWriteModel.RequestType_RESET requestReset := readWriteModel.NewRequestReset( @@ -442,7 +442,7 @@ func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnection } startTime := time.Now() - timeout := time.NewTimer(time.Millisecond * 500) + timer := time.NewTimer(ttl) select { case <-receivedResetEchoChan: c.log.Debug().Msg("We received the echo") @@ -453,7 +453,7 @@ func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnection c.log.Trace().Err(err).Msg("connect failed") } return false - case timeout := <-timeout.C: + case timeout := <-timer.C: if sendOutErrorNotification { c.fireConnectionError(errors.Errorf("Timeout after %v", timeout.Sub(startTime)), ch) } else { @@ -593,27 +593,33 @@ func (c *Connection) sendCalDataWrite(ctx context.Context, ch chan plc4go.PlcCon return nil }, func(err error) error { + c.log.Trace().Err(err).Msg("got error processing request") select { case directCommandAckErrorChan <- errors.Wrap(err, "got error processing request"): + c.log.Trace().Msg("error redirected") default: + c.log.Trace().Err(err).Msg("error discarded") } return nil }, ttl, ); err != nil { + c.log.Trace().Err(err).Msg("got error sending request") c.fireConnectionError(errors.Wrap(err, "Error during sending of write request"), ch) return false } startTime := time.Now() - timeout := time.NewTimer(60 * time.Second) + timer := time.NewTimer(60 * time.Second) select { case <-directCommandAckChan: - c.log.Debug().Msg("We received the ack") + c.log.Trace().Msg("We received the ack") case err := <-directCommandAckErrorChan: + c.log.Trace().Err(err).Msg("got error processing request") c.fireConnectionError(errors.Wrap(err, "Error receiving of ack"), ch) return false - case timeout := <-timeout.C: + case timeout := <-timer.C: + c.log.Trace().Dur("timeout", timeout.Sub(startTime)).Msg("Timeout") c.fireConnectionError(errors.Errorf("Timeout after %v", timeout.Sub(startTime)), ch) return false } diff --git a/plc4go/internal/cbus/Connection_test.go b/plc4go/internal/cbus/Connection_test.go index b3c33f76e7..5ace85f392 100644 --- a/plc4go/internal/cbus/Connection_test.go +++ b/plc4go/internal/cbus/Connection_test.go @@ -143,7 +143,7 @@ func TestConnection_Connect(t *testing.T) { name string fields fields args args - setup func(t *testing.T, fields *fields) + setup func(*testing.T, *fields, *args) wantAsserter func(*testing.T, <-chan plc4go.PlcConnectionConnectResult) bool }{ { @@ -166,10 +166,7 @@ func TestConnection_Connect(t *testing.T) { connectionId: "connectionId13", tracer: nil, }, - args: args{ - ctx: t.Context(), - }, - setup: func(t *testing.T, fields *fields) { + setup: func(t *testing.T, fields *fields, args *args) { _options := testutils.EnrichOptionsWithOptionsForTesting(t) transport := test.NewTransport(_options...) @@ -180,6 +177,10 @@ func TestConnection_Connect(t *testing.T) { assert.Error(t, codec.Disconnect()) }) fields.messageCodec = codec + + var cancelFunc context.CancelFunc + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + t.Cleanup(cancelFunc) }, wantAsserter: func(t *testing.T, results <-chan plc4go.PlcConnectionConnectResult) bool { assert.NotNil(t, results) @@ -194,7 +195,7 @@ func TestConnection_Connect(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { if tt.setup != nil { - tt.setup(t, &tt.fields) + tt.setup(t, &tt.fields, &tt.args) } c := &Connection{ messageCodec: tt.fields.messageCodec, @@ -887,8 +888,6 @@ func TestConnection_sendReset(t *testing.T) { type args struct { ctx context.Context ch chan plc4go.PlcConnectionConnectResult - cbusOptions *readWriteModel.CBusOptions - requestContext *readWriteModel.RequestContext sendOutErrorNotification bool } tests := []struct { @@ -901,15 +900,7 @@ func TestConnection_sendReset(t *testing.T) { { name: "send reset", args: args{ - ch: make(chan plc4go.PlcConnectionConnectResult, 1), - cbusOptions: func() *readWriteModel.CBusOptions { - var cBusOptions readWriteModel.CBusOptions = readWriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, false) - return &cBusOptions - }(), - requestContext: func() *readWriteModel.RequestContext { - var requestContext readWriteModel.RequestContext = readWriteModel.NewRequestContext(false) - return &requestContext - }(), + ch: make(chan plc4go.PlcConnectionConnectResult, 1), sendOutErrorNotification: false, }, setup: func(t *testing.T, fields *fields, args *args) { @@ -926,7 +917,7 @@ func TestConnection_sendReset(t *testing.T) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, wantOk: false, @@ -948,7 +939,7 @@ func TestConnection_sendReset(t *testing.T) { log: testutils.ProduceTestingLogger(t), } c.DefaultConnection = _default.NewDefaultConnection(c, testutils.EnrichOptionsWithOptionsForTesting(t)...) - assert.Equalf(t, tt.wantOk, c.sendReset(tt.args.ctx, tt.args.ch, tt.args.cbusOptions, tt.args.requestContext, tt.args.sendOutErrorNotification), "sendReset(%v, %v, %v, %v, %v)", tt.args.ctx, tt.args.ch, tt.args.cbusOptions, tt.args.requestContext, tt.args.sendOutErrorNotification) + assert.Equalf(t, tt.wantOk, c.sendReset(tt.args.ctx, tt.args.ch, tt.args.sendOutErrorNotification), "sendReset(%v, %v, %v)", tt.args.ctx, tt.args.ch, tt.args.sendOutErrorNotification) }) } } @@ -1076,6 +1067,7 @@ func TestConnection_setInterface1PowerUpSettings(t *testing.T) { codec := NewMessageCodec(ti, _options...) require.NoError(t, codec.Connect(t.Context())) t.Cleanup(func() { + t.Log("disconnecting codec") assert.Error(t, codec.Disconnect()) }) fields.messageCodec = codec @@ -1306,7 +1298,7 @@ func TestConnection_setupConnection(t *testing.T) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, validator: func(t *testing.T, result plc4go.PlcConnectionConnectResult) { @@ -1644,7 +1636,7 @@ func TestConnection_setupConnection(t *testing.T) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, validator: func(t *testing.T, result plc4go.PlcConnectionConnectResult) { diff --git a/plc4go/internal/cbus/Driver_test.go b/plc4go/internal/cbus/Driver_test.go index ede88034df..0725d531ca 100644 --- a/plc4go/internal/cbus/Driver_test.go +++ b/plc4go/internal/cbus/Driver_test.go @@ -73,7 +73,7 @@ func TestDriver_GetConnection(t *testing.T) { setup: func(t *testing.T, fields *fields, args *args) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, wantVerifier: func(t *testing.T, results <-chan plc4go.PlcConnectionConnectResult) bool { @@ -108,7 +108,7 @@ func TestDriver_GetConnection(t *testing.T) { args.transports["test"] = test.NewTransport(testutils.EnrichOptionsWithOptionsForTesting(t)...) args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, wantVerifier: func(t *testing.T, results <-chan plc4go.PlcConnectionConnectResult) bool { @@ -144,7 +144,7 @@ func TestDriver_GetConnection(t *testing.T) { args.transports["test"] = test.NewTransport(testutils.EnrichOptionsWithOptionsForTesting(t)...) args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, wantVerifier: func(t *testing.T, results <-chan plc4go.PlcConnectionConnectResult) bool { @@ -177,7 +177,7 @@ func TestDriver_GetConnection(t *testing.T) { args.transports["test"] = test.NewTransport(testutils.EnrichOptionsWithOptionsForTesting(t)...) args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, wantVerifier: func(t *testing.T, results <-chan plc4go.PlcConnectionConnectResult) bool { diff --git a/plc4go/internal/cbus/MessageCodec.go b/plc4go/internal/cbus/MessageCodec.go index 7f6c18c14b..5325da1c33 100644 --- a/plc4go/internal/cbus/MessageCodec.go +++ b/plc4go/internal/cbus/MessageCodec.go @@ -137,16 +137,18 @@ func (m *MessageCodec) Receive(ctx context.Context, timeout time.Duration) (spi. if err := ti.FillBuffer( ctx, func(pos uint, currentByte byte, reader transports.ExtendedReader) bool { - m.log.Trace().Uint8("byte", currentByte).Msg("current byte") + m.log.Trace().Uint("pos", pos).Uint8("byte", currentByte).Str("rune", string(rune(currentByte))).Msg("current byte") switch currentByte { case readWriteModel.ResponseTermination_CR, readWriteModel.ResponseTermination_LF: + m.log.Trace().Msg("Found termination byte") return false case byte(readWriteModel.ConfirmationType_CONFIRMATION_SUCCESSFUL): confirmation = true // In case we have directly more data in the buffer after a confirmation _, err := reader.Peek(int(pos + 1)) + m.log.Trace().Err(err).Msg("Peeking one more") return err == nil case byte(readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS), @@ -155,6 +157,7 @@ func (m *MessageCodec) Receive(ctx context.Context, timeout time.Duration) (spi. byte(readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG), byte(readWriteModel.ConfirmationType_CHECKSUM_FAILURE): confirmation = true + m.log.Trace().Msg("Found confirmation") return false default: return true diff --git a/plc4go/internal/cbus/MessageCodec_test.go b/plc4go/internal/cbus/MessageCodec_test.go index 012434cc09..09efc0ea3a 100644 --- a/plc4go/internal/cbus/MessageCodec_test.go +++ b/plc4go/internal/cbus/MessageCodec_test.go @@ -20,6 +20,7 @@ package cbus import ( + "context" "fmt" "testing" "time" @@ -117,6 +118,7 @@ func TestMessageCodec_Receive(t *testing.T) { monitoredSALs chan readWriteModel.MonitoredSAL } type args struct { + ctx context.Context timeout time.Duration } tests := []struct { @@ -137,6 +139,7 @@ func TestMessageCodec_Receive(t *testing.T) { monitoredSALs: nil, }, args: args{ + ctx: t.Context(), timeout: 10 * time.Second, }, setup: func(t *testing.T, fields *fields) { @@ -162,6 +165,7 @@ func TestMessageCodec_Receive(t *testing.T) { monitoredSALs: nil, }, args: args{ + ctx: t.Context(), timeout: 10 * time.Second, }, want: readWriteModel.NewCBusMessageToClient( @@ -193,6 +197,7 @@ func TestMessageCodec_Receive(t *testing.T) { monitoredSALs: nil, }, args: args{ + ctx: t.Context(), timeout: 10 * time.Second, }, setup: func(t *testing.T, fields *fields) { @@ -219,6 +224,7 @@ func TestMessageCodec_Receive(t *testing.T) { monitoredSALs: nil, }, args: args{ + ctx: t.Context(), timeout: 10 * time.Second, }, setup: func(t *testing.T, fields *fields) { @@ -245,6 +251,7 @@ func TestMessageCodec_Receive(t *testing.T) { monitoredSALs: nil, }, args: args{ + ctx: t.Context(), timeout: 10 * time.Second, }, setup: func(t *testing.T, fields *fields) { @@ -279,6 +286,7 @@ func TestMessageCodec_Receive(t *testing.T) { monitoredSALs: nil, }, args: args{ + ctx: t.Context(), timeout: 10 * time.Second, }, manipulator: func(t *testing.T, messageCodec *MessageCodec) { @@ -326,6 +334,7 @@ func TestMessageCodec_Receive(t *testing.T) { monitoredSALs: nil, }, args: args{ + ctx: t.Context(), timeout: 10 * time.Second, }, setup: func(t *testing.T, fields *fields) { @@ -523,6 +532,7 @@ func TestMessageCodec_Receive(t *testing.T) { monitoredSALs: nil, }, args: args{ + ctx: t.Context(), timeout: 10 * time.Second, }, manipulator: func(t *testing.T, messageCodec *MessageCodec) { @@ -603,7 +613,7 @@ func TestMessageCodec_Receive(t *testing.T) { if tt.manipulator != nil { tt.manipulator(t, m) } - got, err := m.Receive(t.Context(), tt.args.timeout) + got, err := m.Receive(tt.args.ctx, tt.args.timeout) if !tt.wantErr(t, err, fmt.Sprintf("Receive()")) { return } @@ -776,10 +786,10 @@ func Test_extractMMIAndSAL(t *testing.T) { message spi.Message } tests := []struct { - name string - args args - setup func(t *testing.T, args *args) - want bool + name string + args args + setup func(t *testing.T, args *args) + handled bool }{ { name: "extract it", @@ -818,6 +828,7 @@ func Test_extractMMIAndSAL(t *testing.T) { codec.monitoredSALs = make(chan readWriteModel.MonitoredSAL, 1) args.codec = codec }, + handled: true, }, } for _, tt := range tests { @@ -825,7 +836,7 @@ func Test_extractMMIAndSAL(t *testing.T) { if tt.setup != nil { tt.setup(t, &tt.args) } - assert.Equalf(t, tt.want, extractMMIAndSAL(testutils.ProduceTestingLogger(t))(t.Context(), tt.args.codec, tt.args.message), "extractMMIAndSAL(%v, %v)", tt.args.codec, tt.args.message) + assert.Equalf(t, tt.handled, extractMMIAndSAL(testutils.ProduceTestingLogger(t))(t.Context(), tt.args.codec, tt.args.message), "extractMMIAndSAL(%v, %v) to be handled", tt.args.codec, tt.args.message) }) } } diff --git a/plc4go/internal/cbus/Reader_test.go b/plc4go/internal/cbus/Reader_test.go index ac954a88d0..1466de5cec 100644 --- a/plc4go/internal/cbus/Reader_test.go +++ b/plc4go/internal/cbus/Reader_test.go @@ -102,7 +102,7 @@ func TestReader_Read(t *testing.T) { setup: func(t *testing.T, fields *fields, args *args) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, wantAsserter: func(t *testing.T, results <-chan apiModel.PlcReadRequestResult) bool { @@ -162,7 +162,7 @@ func TestReader_readSync(t *testing.T) { setup: func(t *testing.T, fields *fields, args *args) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, resultEvaluator: func(t *testing.T, results chan apiModel.PlcReadRequestResult) bool { @@ -214,7 +214,7 @@ func TestReader_readSync(t *testing.T) { }) args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, resultEvaluator: func(t *testing.T, results chan apiModel.PlcReadRequestResult) bool { @@ -242,7 +242,7 @@ func TestReader_readSync(t *testing.T) { setup: func(t *testing.T, fields *fields, args *args) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, resultEvaluator: func(t *testing.T, results chan apiModel.PlcReadRequestResult) bool { @@ -321,7 +321,7 @@ func TestReader_readSync(t *testing.T) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, resultEvaluator: func(t *testing.T, results chan apiModel.PlcReadRequestResult) bool { @@ -483,7 +483,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, }, @@ -644,7 +644,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, }, @@ -727,7 +727,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, }, @@ -810,7 +810,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, }, @@ -893,7 +893,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, }, @@ -976,7 +976,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, }, @@ -1059,7 +1059,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, }, @@ -1142,7 +1142,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, }, @@ -1161,12 +1161,11 @@ func TestReader_sendMessageOverTheWire(t *testing.T) { } m.sendMessageOverTheWire(tt.args.ctx, tt.args.transaction, tt.args.messageToSend, tt.args.addResponseCode(t), tt.args.tagName, tt.args.addPlcValue(t)) t.Log("Waiting now") - timer := time.NewTimer(10 * time.Second) select { + case <-t.Context().Done(): + t.Log("aborted") case <-ch: t.Log("Done waiting") - case <-timer.C: - t.Error("Timeout") } }) } diff --git a/plc4go/internal/cbus/Writer_test.go b/plc4go/internal/cbus/Writer_test.go index 3bacea5d17..83db24aece 100644 --- a/plc4go/internal/cbus/Writer_test.go +++ b/plc4go/internal/cbus/Writer_test.go @@ -86,7 +86,7 @@ func TestWriter_Write(t *testing.T) { setup: func(t *testing.T, fields *fields, args *args) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, wantAsserter: func(t *testing.T, results <-chan apiModel.PlcWriteRequestResult) bool { @@ -112,7 +112,7 @@ func TestWriter_Write(t *testing.T) { setup: func(t *testing.T, fields *fields, args *args) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, wantAsserter: func(t *testing.T, results <-chan apiModel.PlcWriteRequestResult) bool { @@ -162,7 +162,7 @@ func TestWriter_Write(t *testing.T) { setup: func(t *testing.T, fields *fields, args *args){ args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, wantAsserter: func(t *testing.T, results <-chan apiModel.PlcWriteRequestResult) bool { diff --git a/plc4go/spi/default/DefaultCodec.go b/plc4go/spi/default/DefaultCodec.go index f14d6efe31..7666f6f2b1 100644 --- a/plc4go/spi/default/DefaultCodec.go +++ b/plc4go/spi/default/DefaultCodec.go @@ -358,7 +358,10 @@ mainLoop: continue mainLoop } nextExpire := m.TimeoutExpectations(now) - workerLog.Debug().Dur("nextExpire", nextExpire).Msg("waiting for next expire") + m.expectationsChangeMutex.RLock() + numberOfExpectations = len(m.expectations) + m.expectationsChangeMutex.RUnlock() + workerLog.Debug().Dur("nextExpire", nextExpire).Int("numberOfExpectations", numberOfExpectations).Msg("waiting for next expire") timer := time.NewTimer(nextExpire) select { case <-m.notifyExpireWorker: @@ -471,7 +474,7 @@ mainLoop: workerLog.Trace().Msg("Executing custom handling") start := time.Now() handled := m.customMessageHandling(m.ctx, m.DefaultCodecRequirements, message) - workerLog.Trace().TimeDiff("elapsedTime", time.Now(), start).Msg("custom handling took elapsedTime") + workerLog.Trace().TimeDiff("elapsedTime", time.Now(), start).Bool("handled", handled).Msg("custom handling took elapsedTime") if handled { workerLog.Trace().Msg("Custom handling handled the message") continue mainLoop diff --git a/plc4go/spi/default/DefaultCodec_test.go b/plc4go/spi/default/DefaultCodec_test.go index a4b82fbcab..15ca290865 100644 --- a/plc4go/spi/default/DefaultCodec_test.go +++ b/plc4go/spi/default/DefaultCodec_test.go @@ -491,7 +491,7 @@ func Test_defaultCodec_Expect(t *testing.T) { setup: func(t *testing.T, fields *fields, args *args) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, }, @@ -883,7 +883,7 @@ func Test_defaultCodec_SendRequest(t *testing.T) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, wantErr: assert.NoError, @@ -908,7 +908,7 @@ func Test_defaultCodec_SendRequest(t *testing.T) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, wantErr: assert.Error, diff --git a/plc4go/spi/interceptors/SingleItemRequestInterceptor_test.go b/plc4go/spi/interceptors/SingleItemRequestInterceptor_test.go index 58c244400a..ed6f7032e0 100644 --- a/plc4go/spi/interceptors/SingleItemRequestInterceptor_test.go +++ b/plc4go/spi/interceptors/SingleItemRequestInterceptor_test.go @@ -129,7 +129,7 @@ func TestSingleItemRequestInterceptor_InterceptReadRequest(t *testing.T) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, wantAssert: func(t *testing.T, args args, got []apiModel.PlcReadRequest) bool { @@ -285,7 +285,7 @@ func TestSingleItemRequestInterceptor_InterceptWriteRequest(t *testing.T) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, wantAssert: func(t *testing.T, args args, got []apiModel.PlcWriteRequest) bool { @@ -445,7 +445,7 @@ func TestSingleItemRequestInterceptor_ProcessReadResponses(t *testing.T) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, wantAssert: func(t *testing.T, args args, got apiModel.PlcReadRequestResult) bool { @@ -476,7 +476,7 @@ func TestSingleItemRequestInterceptor_ProcessReadResponses(t *testing.T) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, wantAssert: func(t *testing.T, args args, got apiModel.PlcReadRequestResult) bool { @@ -626,7 +626,7 @@ func TestSingleItemRequestInterceptor_ProcessWriteResponses(t *testing.T) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, wantAssert: func(t *testing.T, args args, got apiModel.PlcWriteRequestResult) bool { @@ -655,7 +655,7 @@ func TestSingleItemRequestInterceptor_ProcessWriteResponses(t *testing.T) { args.ctx = testutils.TestContext(t) var cancelFunc context.CancelFunc - args.ctx, cancelFunc = context.WithTimeout(args.ctx, 2*time.Second) + args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) }, wantAssert: func(t *testing.T, args args, got apiModel.PlcWriteRequestResult) bool { diff --git a/plc4go/spi/transports/test/TransportInstance.go b/plc4go/spi/transports/test/TransportInstance.go index b937e7b608..2244a22529 100644 --- a/plc4go/spi/transports/test/TransportInstance.go +++ b/plc4go/spi/transports/test/TransportInstance.go @@ -20,6 +20,7 @@ package test import ( + "bufio" "context" "encoding/hex" "os" @@ -36,18 +37,19 @@ import ( ) type TransportInstance struct { - readChannel chan []byte - readBuffer []byte - writeBuffer []byte - transport *Transport + readChannel chan []byte + readBuffer []byte + writeBuffer []byte + dataMutex sync.RWMutex + + transport *Transport + writeInterceptor func(transportInstance *TransportInstance, data []byte) + simulatedLatency time.Duration - dataMutex sync.RWMutex connected atomic.Bool stateChangeMutex sync.RWMutex - simulatedLatency time.Duration - log zerolog.Logger } @@ -151,11 +153,7 @@ func (m *TransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) { if !m.IsConnected() { panic(errors.New("working on a unconnected connection")) } - m.dataMutex.RLock() - readableBytes := len(m.readBuffer) - m.dataMutex.RUnlock() - m.log.Trace().Int("readableBytes", readableBytes).Msg("return number of readable bytes") - return uint32(readableBytes), nil + return m.availableBytes(), nil } func (m *TransportInstance) FillBuffer(ctx context.Context, until func(pos uint, currentByte byte, reader transports.ExtendedReader) (keepGoing bool), timeout time.Duration) error { @@ -220,19 +218,12 @@ func (m *TransportInstance) PeekReadableBytes(ctx context.Context, numBytes uint var err error if availableBytes < numBytes { m.log.Trace().Msg("not enough bytes available") - err = errors.New("not enough bytes available") - } else { - m.log.Trace().Msg("enough bytes available") - } - if availableBytes == 0 { - m.log.Trace().Msg("No bytes available") - return nil, err + return m.readBuffer[:], bufio.ErrBufferFull } - m.dataMutex.RLock() - peekAble := m.readBuffer[0:availableBytes] - m.dataMutex.RUnlock() + m.log.Trace().Msg("enough bytes available") + peekAble := m.peek() m.log.Trace().Int("peekAbleLen", len(peekAble)).Msg("New buffer size peekAbleLen") - return peekAble, err + return peekAble[:numBytes], err } func (m *TransportInstance) Read(ctx context.Context, numBytes uint32, timeout time.Duration) ([]byte, error) { @@ -270,12 +261,7 @@ func (m *TransportInstance) Read(ctx context.Context, numBytes uint32, timeout t case <-ctx.Done(): case <-timer.C: } - m.dataMutex.RLock() - data := m.readBuffer[0:int(numBytes)] - m.readBuffer = m.readBuffer[int(numBytes):] - m.dataMutex.RUnlock() - m.log.Trace().Uint32("availableBytes", availableBytes).Msg("New buffer size availableBytes") - return data, nil + return m.read(int(numBytes)), nil } func (m *TransportInstance) SetWriteInterceptor(writeInterceptor func(transportInstance *TransportInstance, data []byte)) { @@ -370,9 +356,30 @@ func (m *TransportInstance) availableBytes() uint32 { return uint32(len(m.readBuffer)) } +func (m *TransportInstance) peek() []byte { + m.dataMutex.RLock() + defer m.dataMutex.RUnlock() + return m.readBuffer[0:len(m.readBuffer)] +} + +func (m *TransportInstance) read(numBytes int) []byte { + m.dataMutex.Lock() + defer m.dataMutex.Unlock() + data := m.readBuffer[0:int(numBytes)] + m.readBuffer = m.readBuffer[int(numBytes):] + return data +} + +func (m *TransportInstance) appendRead(newBytes ...byte) (totalAvailableBytes uint32) { + m.dataMutex.Lock() + defer m.dataMutex.Unlock() + m.readBuffer = append(m.readBuffer, newBytes...) + return uint32(len(m.readBuffer)) +} + func (m *TransportInstance) transferFromChannel(ctx context.Context, timeout time.Duration) (totalAvailableBytes uint32) { m.log.Trace().Dur("timeout", timeout).Msg("Transfer from channel") - m.dataMutex.Lock() + totalAvailableBytes = m.availableBytes() timer := time.NewTimer(timeout) start := time.Now() select { @@ -382,9 +389,7 @@ func (m *TransportInstance) transferFromChannel(ctx context.Context, timeout tim m.log.Trace().Msg("Timeout") case newBytes := <-m.readChannel: m.log.Trace().Dur("time", time.Since(start)).Msg("Got new bytes") - m.readBuffer = append(m.readBuffer, newBytes...) + totalAvailableBytes = m.appendRead(newBytes...) } - totalAvailableBytes = uint32(len(m.readBuffer)) - m.dataMutex.Unlock() return totalAvailableBytes } diff --git a/plc4go/spi/transports/test/TransportInstance_test.go b/plc4go/spi/transports/test/TransportInstance_test.go index b7ffe17d64..ed32c26fe3 100644 --- a/plc4go/spi/transports/test/TransportInstance_test.go +++ b/plc4go/spi/transports/test/TransportInstance_test.go @@ -270,6 +270,7 @@ func TestTransportInstance_FillReadBuffer(t *testing.T) { writeBuffer: tt.fields.writeBuffer, transport: tt.fields.transport, writeInterceptor: tt.fields.writeInterceptor, + readChannel: make(chan []byte, 1), } if tt.manipulator != nil { tt.manipulator(t, m) diff --git a/plc4go/spi/transports/test/Transport_test.go b/plc4go/spi/transports/test/Transport_test.go index 45a2a33d19..22e089cbba 100644 --- a/plc4go/spi/transports/test/Transport_test.go +++ b/plc4go/spi/transports/test/Transport_test.go @@ -26,6 +26,7 @@ import ( "github.com/rs/zerolog/log" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/apache/plc4x/plc4go/spi/transports" ) @@ -106,23 +107,24 @@ func TestTransport_CreateTransportInstance(t *testing.T) { options map[string][]string } tests := []struct { - name string - fields fields - args args - want transports.TransportInstance - wantErr bool + name string + fields fields + args args + wantAssert func(*testing.T, transports.TransportInstance) bool + wantErr bool }{ { name: "create it", fields: fields{ preregisteredInstances: map[url.URL]transports.TransportInstance{}, }, - want: &TransportInstance{ - readBuffer: []byte{}, - writeBuffer: []byte{}, - transport: NewTransport(), - simulatedLatency: 100 * time.Millisecond, - log: log.Logger, + wantAssert: func(t *testing.T, instance transports.TransportInstance) bool { + require.NotNil(t, instance) + assert.IsType(t, &TransportInstance{}, instance) + ti := instance.(*TransportInstance) + assert.NotNil(t, ti.transport) + assert.Equal(t, 10*time.Millisecond, ti.simulatedLatency) + return true }, }, { @@ -135,7 +137,9 @@ func TestTransport_CreateTransportInstance(t *testing.T) { args: args{ transportUrl: url.URL{Host: "abcdefg"}, }, - want: nil, + wantAssert: func(t *testing.T, instance transports.TransportInstance) bool { + return assert.Nil(t, instance) + }, }, { name: "fail it on purpose", @@ -144,6 +148,9 @@ func TestTransport_CreateTransportInstance(t *testing.T) { "failTestTransport": {"yes please"}, }, }, + wantAssert: func(t *testing.T, instance transports.TransportInstance) bool { + return assert.Nil(t, instance) + }, wantErr: true, }, } @@ -158,8 +165,8 @@ func TestTransport_CreateTransportInstance(t *testing.T) { t.Errorf("CreateTransportInstance() error = %v, wantErr %v", err, tt.wantErr) return } - if !assert.Equal(t, tt.want, got) { - t.Errorf("CreateTransportInstance() got = %v, want %v", got, tt.want) + if !assert.True(t, tt.wantAssert(t, got)) { + t.Errorf("CreateTransportInstance() got = %v", got) } }) } diff --git a/plc4go/spi/transports/test/transportInstanceDrivenExtendedReader.go b/plc4go/spi/transports/test/transportInstanceDrivenExtendedReader.go index 0995fc113b..9fe0864bd1 100644 --- a/plc4go/spi/transports/test/transportInstanceDrivenExtendedReader.go +++ b/plc4go/spi/transports/test/transportInstanceDrivenExtendedReader.go @@ -47,9 +47,7 @@ func (t *transportInstanceDrivenExtendedReader) Read(p []byte) (n int, err error func (t *transportInstanceDrivenExtendedReader) ReadByte() (byte, error) { numBytes := uint32(1) - t.dataMutex.RLock() availableBytes := t.availableBytes() - t.dataMutex.RUnlock() if availableBytes < numBytes { t.log.Trace().Uint32("numBytes", numBytes).Uint32("availableBytes", availableBytes).Msg("Trying transfer now") availableBytes = t.transferFromChannel(t.ctx, t.timeout) @@ -61,9 +59,7 @@ func (t *transportInstanceDrivenExtendedReader) ReadByte() (byte, error) { func (t *transportInstanceDrivenExtendedReader) Peek(n int) ([]byte, error) { numBytes := uint32(n) - t.dataMutex.RLock() availableBytes := t.availableBytes() - t.dataMutex.RUnlock() if availableBytes < numBytes { t.log.Trace().Uint32("numBytes", numBytes).Uint32("availableBytes", availableBytes).Msg("Trying transfer now") availableBytes = t.transferFromChannel(t.ctx, t.timeout)
