This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 80a0d8ef1b test(plc4go/cbus): fix concurrency issue in test.
80a0d8ef1b is described below

commit 80a0d8ef1b6e83b10fac637d87f87aec91ebe088
Author: Sebastian Rühl <[email protected]>
AuthorDate: Fri Jun 2 10:18:52 2023 +0200

    test(plc4go/cbus): fix concurrency issue in test.
    
    Transition away from using WaitGroup on addResponseCode or addPlcValue and 
hook a chan close into transaction end with EndTransaction or FailTransaction 
as this is really the marker which notes if the transaction is ended (what we 
looking for at the end). This has two benefits: 1. No more flaky tests and 2. 
if we forget to properly end a transaction we will notice and need to fix that 
in prod code.
---
 plc4go/internal/cbus/Reader.go      |   5 +-
 plc4go/internal/cbus/Reader_test.go | 146 +++++++++++++++++-------------------
 2 files changed, 73 insertions(+), 78 deletions(-)

diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 0b3cedc4ff..2382a20e1b 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -24,6 +24,7 @@ import (
        "github.com/apache/plc4x/plc4go/spi/options"
        "github.com/apache/plc4x/plc4go/spi/transactions"
        "github.com/rs/zerolog"
+       "github.com/rs/zerolog/log"
        "sync"
        "time"
 
@@ -200,7 +201,9 @@ func (m *Reader) sendMessageOverTheWire(ctx 
context.Context, transaction transac
                // TODO: check if we can use a plcValueSerializer
                encodedReply := 
embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply()
                if err := MapEncodedReply(m.log, transaction, encodedReply, 
tagName, addResponseCode, addPlcValue); err != nil {
-                       return errors.Wrap(err, "error encoding reply")
+                       log.Error().Err(err).Msg("error encoding reply")
+                       addResponseCode(tagName, 
apiModel.PlcResponseCode_INTERNAL_ERROR)
+                       return transaction.EndRequest()
                }
                return transaction.EndRequest()
        }, func(err error) error {
diff --git a/plc4go/internal/cbus/Reader_test.go 
b/plc4go/internal/cbus/Reader_test.go
index bb93973550..b5baf829e0 100644
--- a/plc4go/internal/cbus/Reader_test.go
+++ b/plc4go/internal/cbus/Reader_test.go
@@ -29,12 +29,12 @@ import (
        "github.com/apache/plc4x/plc4go/spi/testutils"
        "github.com/apache/plc4x/plc4go/spi/transactions"
        "github.com/apache/plc4x/plc4go/spi/transports/test"
+       "github.com/apache/plc4x/plc4go/spi/utils"
        "github.com/pkg/errors"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/mock"
        "net/url"
        "strings"
-       "sync"
        "sync/atomic"
        "testing"
        "time"
@@ -405,16 +405,15 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
                ctx             context.Context
                transaction     transactions.RequestTransaction
                messageToSend   readWriteModel.CBusMessage
-               addResponseCode func(t *testing.T, wg *sync.WaitGroup) 
func(name string, responseCode apiModel.PlcResponseCode)
+               addResponseCode func(t *testing.T) func(name string, 
responseCode apiModel.PlcResponseCode)
                tagName         string
-               addPlcValue     func(t *testing.T, wg *sync.WaitGroup) 
func(name string, plcValue apiValues.PlcValue)
+               addPlcValue     func(t *testing.T) func(name string, plcValue 
apiValues.PlcValue)
        }
        tests := []struct {
                name   string
                fields fields
                args   args
-               setup  func(t *testing.T, fields *fields, args *args)
-               wg     *sync.WaitGroup
+               setup  func(t *testing.T, fields *fields, args *args, ch chan 
struct{})
        }{
                {
                        name: "Send message empty message",
@@ -428,23 +427,21 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
                                        return timeout
                                }(),
                                messageToSend: nil,
-                               addResponseCode: func(t *testing.T, wg 
*sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+                               addResponseCode: func(t *testing.T) func(name 
string, responseCode apiModel.PlcResponseCode) {
                                        return func(name string, responseCode 
apiModel.PlcResponseCode) {
                                                t.Logf("Got response code %s 
for %s", responseCode, name)
                                                assert.Equal(t, "horst", name)
                                                assert.Equal(t, 
apiModel.PlcResponseCode_INTERNAL_ERROR, responseCode)
-                                               wg.Done()
                                        }
                                },
                                tagName: "horst",
-                               addPlcValue: func(t *testing.T, wg 
*sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+                               addPlcValue: func(t *testing.T) func(name 
string, plcValue apiValues.PlcValue) {
                                        return func(name string, plcValue 
apiValues.PlcValue) {
                                                t.Logf("Got response %s for 
%s", plcValue, name)
-                                               wg.Done()
                                        }
                                },
                        },
-                       setup: func(t *testing.T, fields *fields, args *args) {
+                       setup: func(t *testing.T, fields *fields, args *args, 
ch chan struct{}) {
                                testutils.SetToTestingLogger(t, 
readWriteModel.Plc4xModelLog)
 
                                loggerOption := 
options.WithCustomLogger(testutils.ProduceTestingLogger(t))
@@ -471,10 +468,11 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 
                                transaction := NewMockRequestTransaction(t)
                                expect := transaction.EXPECT()
-                               
expect.FailRequest(mock.Anything).Return(errors.New("no I say"))
+                               
expect.FailRequest(mock.Anything).Return(errors.New("no I say")).Run(func(_ 
error) {
+                                       close(ch)
+                               })
                                args.transaction = transaction
                        },
-                       wg: &sync.WaitGroup{},
                },
                {
                        name: "Send message which responds with message to 
client",
@@ -543,31 +541,30 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
                                        nil,
                                        nil,
                                ),
-                               addResponseCode: func(t *testing.T, wg 
*sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+                               addResponseCode: func(t *testing.T) func(name 
string, responseCode apiModel.PlcResponseCode) {
                                        return func(name string, responseCode 
apiModel.PlcResponseCode) {
                                                t.Logf("Got response code %s 
for %s", responseCode, name)
                                                assert.Equal(t, "horst", name)
                                                assert.Equal(t, 
apiModel.PlcResponseCode_REQUEST_TIMEOUT, responseCode)
-                                               wg.Done()
                                        }
                                },
                                tagName: "horst",
-                               addPlcValue: func(t *testing.T, wg 
*sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+                               addPlcValue: func(t *testing.T) func(name 
string, plcValue apiValues.PlcValue) {
                                        return func(name string, plcValue 
apiValues.PlcValue) {
                                                t.Logf("Got response %s for 
%s", plcValue, name)
-                                               wg.Done()
                                        }
                                },
                        },
-                       setup: func(t *testing.T, fields *fields, args *args) {
+                       setup: func(t *testing.T, fields *fields, args *args, 
ch chan struct{}) {
                                testutils.SetToTestingLogger(t, 
readWriteModel.Plc4xModelLog)
 
                                transaction := NewMockRequestTransaction(t)
                                expect := transaction.EXPECT()
-                               
expect.FailRequest(mock.Anything).Return(errors.New("Nope"))
+                               
expect.FailRequest(mock.Anything).Return(errors.New("Nope")).Run(func(_ error) {
+                                       close(ch)
+                               })
                                args.transaction = transaction
                        },
-                       wg: &sync.WaitGroup{},
                },
                {
                        name: "Send message which responds with server error",
@@ -636,31 +633,30 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
                                        nil,
                                        nil,
                                ),
-                               addResponseCode: func(t *testing.T, wg 
*sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+                               addResponseCode: func(t *testing.T) func(name 
string, responseCode apiModel.PlcResponseCode) {
                                        return func(name string, responseCode 
apiModel.PlcResponseCode) {
                                                t.Logf("Got response code %s 
for %s", responseCode, name)
                                                assert.Equal(t, "horst", name)
                                                assert.Equal(t, 
apiModel.PlcResponseCode_INVALID_DATA, responseCode)
-                                               wg.Done()
                                        }
                                },
                                tagName: "horst",
-                               addPlcValue: func(t *testing.T, wg 
*sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+                               addPlcValue: func(t *testing.T) func(name 
string, plcValue apiValues.PlcValue) {
                                        return func(name string, plcValue 
apiValues.PlcValue) {
                                                t.Logf("Got response %s for 
%s", plcValue, name)
-                                               wg.Done()
                                        }
                                },
                        },
-                       setup: func(t *testing.T, fields *fields, args *args) {
+                       setup: func(t *testing.T, fields *fields, args *args, 
ch chan struct{}) {
                                testutils.SetToTestingLogger(t, 
readWriteModel.Plc4xModelLog)
 
                                transaction := NewMockRequestTransaction(t)
                                expect := transaction.EXPECT()
-                               expect.EndRequest().Return(nil)
+                               expect.EndRequest().Return(nil).Run(func() {
+                                       close(ch)
+                               })
                                args.transaction = transaction
                        },
-                       wg: &sync.WaitGroup{},
                },
                {
                        name: "Send message which responds with too many 
retransmissions",
@@ -692,26 +688,26 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
                                        nil,
                                        nil,
                                ),
-                               addResponseCode: func(t *testing.T, wg 
*sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+                               addResponseCode: func(t *testing.T) func(name 
string, responseCode apiModel.PlcResponseCode) {
                                        return func(name string, responseCode 
apiModel.PlcResponseCode) {
                                                t.Logf("Got response code %s 
for %s", responseCode, name)
                                                assert.Equal(t, "horst", name)
                                                assert.Equal(t, 
apiModel.PlcResponseCode_REMOTE_ERROR, responseCode)
-                                               wg.Done()
                                        }
                                },
                                tagName: "horst",
-                               addPlcValue: func(t *testing.T, wg 
*sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+                               addPlcValue: func(t *testing.T) func(name 
string, plcValue apiValues.PlcValue) {
                                        return func(name string, plcValue 
apiValues.PlcValue) {
                                                t.Logf("Got response %s for 
%s", plcValue, name)
-                                               wg.Done()
                                        }
                                },
                        },
-                       setup: func(t *testing.T, fields *fields, args *args) {
+                       setup: func(t *testing.T, fields *fields, args *args, 
ch chan struct{}) {
                                transaction := NewMockRequestTransaction(t)
                                expect := transaction.EXPECT()
-                               expect.EndRequest().Return(nil)
+                               expect.EndRequest().Return(nil).Run(func() {
+                                       close(ch)
+                               })
                                args.transaction = transaction
 
                                // Setup logger
@@ -759,7 +755,6 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
                                }
                                fields.messageCodec = codec
                        },
-                       wg: &sync.WaitGroup{},
                },
                {
                        name: "Send message which responds with corruption",
@@ -791,26 +786,26 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
                                        nil,
                                        nil,
                                ),
-                               addResponseCode: func(t *testing.T, wg 
*sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+                               addResponseCode: func(t *testing.T) func(name 
string, responseCode apiModel.PlcResponseCode) {
                                        return func(name string, responseCode 
apiModel.PlcResponseCode) {
                                                t.Logf("Got response code %s 
for %s", responseCode, name)
                                                assert.Equal(t, "horst", name)
                                                assert.Equal(t, 
apiModel.PlcResponseCode_INVALID_DATA, responseCode)
-                                               wg.Done()
                                        }
                                },
                                tagName: "horst",
-                               addPlcValue: func(t *testing.T, wg 
*sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+                               addPlcValue: func(t *testing.T) func(name 
string, plcValue apiValues.PlcValue) {
                                        return func(name string, plcValue 
apiValues.PlcValue) {
                                                t.Logf("Got response %s for 
%s", plcValue, name)
-                                               wg.Done()
                                        }
                                },
                        },
-                       setup: func(t *testing.T, fields *fields, args *args) {
+                       setup: func(t *testing.T, fields *fields, args *args, 
ch chan struct{}) {
                                transaction := NewMockRequestTransaction(t)
                                expect := transaction.EXPECT()
-                               expect.EndRequest().Return(nil)
+                               expect.EndRequest().Return(nil).Run(func() {
+                                       close(ch)
+                               })
                                args.transaction = transaction
 
                                // Setup logger
@@ -858,7 +853,6 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
                                }
                                fields.messageCodec = codec
                        },
-                       wg: &sync.WaitGroup{},
                },
                {
                        name: "Send message which responds with sync loss",
@@ -890,26 +884,26 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
                                        nil,
                                        nil,
                                ),
-                               addResponseCode: func(t *testing.T, wg 
*sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+                               addResponseCode: func(t *testing.T) func(name 
string, responseCode apiModel.PlcResponseCode) {
                                        return func(name string, responseCode 
apiModel.PlcResponseCode) {
                                                t.Logf("Got response code %s 
for %s", responseCode, name)
                                                assert.Equal(t, "horst", name)
                                                assert.Equal(t, 
apiModel.PlcResponseCode_REMOTE_BUSY, responseCode)
-                                               wg.Done()
                                        }
                                },
                                tagName: "horst",
-                               addPlcValue: func(t *testing.T, wg 
*sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+                               addPlcValue: func(t *testing.T) func(name 
string, plcValue apiValues.PlcValue) {
                                        return func(name string, plcValue 
apiValues.PlcValue) {
                                                t.Logf("Got response %s for 
%s", plcValue, name)
-                                               wg.Done()
                                        }
                                },
                        },
-                       setup: func(t *testing.T, fields *fields, args *args) {
+                       setup: func(t *testing.T, fields *fields, args *args, 
ch chan struct{}) {
                                transaction := NewMockRequestTransaction(t)
                                expect := transaction.EXPECT()
-                               expect.EndRequest().Return(nil)
+                               expect.EndRequest().Return(nil).Run(func() {
+                                       close(ch)
+                               })
                                args.transaction = transaction
 
                                // Setup logger
@@ -957,7 +951,6 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
                                }
                                fields.messageCodec = codec
                        },
-                       wg: &sync.WaitGroup{},
                },
                {
                        name: "Send message which responds with too long",
@@ -989,26 +982,26 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
                                        nil,
                                        nil,
                                ),
-                               addResponseCode: func(t *testing.T, wg 
*sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+                               addResponseCode: func(t *testing.T) func(name 
string, responseCode apiModel.PlcResponseCode) {
                                        return func(name string, responseCode 
apiModel.PlcResponseCode) {
                                                t.Logf("Got response code %s 
for %s", responseCode, name)
                                                assert.Equal(t, "horst", name)
                                                assert.Equal(t, 
apiModel.PlcResponseCode_INVALID_DATA, responseCode)
-                                               wg.Done()
                                        }
                                },
                                tagName: "horst",
-                               addPlcValue: func(t *testing.T, wg 
*sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+                               addPlcValue: func(t *testing.T) func(name 
string, plcValue apiValues.PlcValue) {
                                        return func(name string, plcValue 
apiValues.PlcValue) {
                                                t.Logf("Got response %s for 
%s", plcValue, name)
-                                               wg.Done()
                                        }
                                },
                        },
-                       setup: func(t *testing.T, fields *fields, args *args) {
+                       setup: func(t *testing.T, fields *fields, args *args, 
ch chan struct{}) {
                                transaction := NewMockRequestTransaction(t)
                                expect := transaction.EXPECT()
-                               expect.EndRequest().Return(nil)
+                               expect.EndRequest().Return(nil).Run(func() {
+                                       close(ch)
+                               })
                                args.transaction = transaction
 
                                // Setup logger
@@ -1056,7 +1049,6 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
                                }
                                fields.messageCodec = codec
                        },
-                       wg: &sync.WaitGroup{},
                },
                {
                        name: "Send message which responds with confirm only",
@@ -1088,26 +1080,26 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
                                        nil,
                                        nil,
                                ),
-                               addResponseCode: func(t *testing.T, wg 
*sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+                               addResponseCode: func(t *testing.T) func(name 
string, responseCode apiModel.PlcResponseCode) {
                                        return func(name string, responseCode 
apiModel.PlcResponseCode) {
                                                t.Logf("Got response code %s 
for %s", responseCode, name)
                                                assert.Equal(t, "horst", name)
                                                assert.Equal(t, 
apiModel.PlcResponseCode_NOT_FOUND, responseCode)
-                                               wg.Done()
                                        }
                                },
                                tagName: "horst",
-                               addPlcValue: func(t *testing.T, wg 
*sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+                               addPlcValue: func(t *testing.T) func(name 
string, plcValue apiValues.PlcValue) {
                                        return func(name string, plcValue 
apiValues.PlcValue) {
                                                t.Logf("Got response %s for 
%s", plcValue, name)
-                                               wg.Done()
                                        }
                                },
                        },
-                       setup: func(t *testing.T, fields *fields, args *args) {
+                       setup: func(t *testing.T, fields *fields, args *args, 
ch chan struct{}) {
                                transaction := NewMockRequestTransaction(t)
                                expect := transaction.EXPECT()
-                               expect.EndRequest().Return(nil)
+                               expect.EndRequest().Return(nil).Run(func() {
+                                       close(ch)
+                               })
                                args.transaction = transaction
 
                                // Setup logger
@@ -1155,7 +1147,6 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
                                }
                                fields.messageCodec = codec
                        },
-                       wg: &sync.WaitGroup{},
                },
                {
                        name: "Send message which responds with ok",
@@ -1187,26 +1178,26 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
                                        nil,
                                        nil,
                                ),
-                               addResponseCode: func(t *testing.T, wg 
*sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+                               addResponseCode: func(t *testing.T) func(name 
string, responseCode apiModel.PlcResponseCode) {
                                        return func(name string, responseCode 
apiModel.PlcResponseCode) {
                                                t.Logf("Got response code %s 
for %s", responseCode, name)
                                                assert.Equal(t, "horst", name)
                                                assert.Equal(t, 
apiModel.PlcResponseCode_OK, responseCode)
-                                               wg.Done()
                                        }
                                },
                                tagName: "horst",
-                               addPlcValue: func(t *testing.T, wg 
*sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+                               addPlcValue: func(t *testing.T) func(name 
string, plcValue apiValues.PlcValue) {
                                        return func(name string, plcValue 
apiValues.PlcValue) {
                                                t.Logf("Got response %s for 
%s", plcValue, name)
-                                               wg.Done()
                                        }
                                },
                        },
-                       setup: func(t *testing.T, fields *fields, args *args) {
+                       setup: func(t *testing.T, fields *fields, args *args, 
ch chan struct{}) {
                                transaction := NewMockRequestTransaction(t)
                                expect := transaction.EXPECT()
-                               expect.EndRequest().Return(nil)
+                               expect.EndRequest().Return(nil).Run(func() {
+                                       close(ch)
+                               })
                                args.transaction = transaction
 
                                // Setup logger
@@ -1254,28 +1245,29 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
                                }
                                fields.messageCodec = codec
                        },
-                       wg: func() *sync.WaitGroup {
-                               wg := &sync.WaitGroup{}
-                               wg.Add(1) // We getting an response and a value
-                               return wg
-                       }(),
                },
        }
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
+                       ch := make(chan struct{})
                        if tt.setup != nil {
-                               tt.setup(t, &tt.fields, &tt.args)
+                               tt.setup(t, &tt.fields, &tt.args, ch)
                        }
                        m := &Reader{
                                alphaGenerator: tt.fields.alphaGenerator,
                                messageCodec:   tt.fields.messageCodec,
                                tm:             tt.fields.tm,
                        }
-                       tt.wg.Add(1)
-                       m.sendMessageOverTheWire(tt.args.ctx, 
tt.args.transaction, tt.args.messageToSend, tt.args.addResponseCode(t, tt.wg), 
tt.args.tagName, tt.args.addPlcValue(t, tt.wg))
+                       m.sendMessageOverTheWire(tt.args.ctx, 
tt.args.transaction, tt.args.messageToSend, tt.args.addResponseCode(t), 
tt.args.tagName, tt.args.addPlcValue(t))
                        t.Log("Waiting now")
-                       tt.wg.Wait() // TODO: we need to timeout this too
-                       t.Log("Done waiting")
+                       timer := time.NewTimer(3 * time.Second)
+                       defer utils.CleanupTimer(timer)
+                       select {
+                       case <-ch:
+                               t.Log("Done waiting")
+                       case <-timer.C:
+                               t.Error("Timeout")
+                       }
                })
        }
 }

Reply via email to