This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 80a0d8ef1b test(plc4go/cbus): fix concurrency issue in test.
80a0d8ef1b is described below
commit 80a0d8ef1b6e83b10fac637d87f87aec91ebe088
Author: Sebastian Rühl <[email protected]>
AuthorDate: Fri Jun 2 10:18:52 2023 +0200
test(plc4go/cbus): fix concurrency issue in test.
Transition away from using WaitGroup on addResponseCode or addPlcValue and
hook a chan close into transaction end with EndTransaction or FailTransaction
as this is really the marker which notes if the transaction is ended (what we
looking for at the end). This has two benefits: 1. No more flaky tests and 2.
if we forget to properly end a transaction we will notice and need to fix that
in prod code.
---
plc4go/internal/cbus/Reader.go | 5 +-
plc4go/internal/cbus/Reader_test.go | 146 +++++++++++++++++-------------------
2 files changed, 73 insertions(+), 78 deletions(-)
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 0b3cedc4ff..2382a20e1b 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -24,6 +24,7 @@ import (
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transactions"
"github.com/rs/zerolog"
+ "github.com/rs/zerolog/log"
"sync"
"time"
@@ -200,7 +201,9 @@ func (m *Reader) sendMessageOverTheWire(ctx
context.Context, transaction transac
// TODO: check if we can use a plcValueSerializer
encodedReply :=
embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply()
if err := MapEncodedReply(m.log, transaction, encodedReply,
tagName, addResponseCode, addPlcValue); err != nil {
- return errors.Wrap(err, "error encoding reply")
+ log.Error().Err(err).Msg("error encoding reply")
+ addResponseCode(tagName,
apiModel.PlcResponseCode_INTERNAL_ERROR)
+ return transaction.EndRequest()
}
return transaction.EndRequest()
}, func(err error) error {
diff --git a/plc4go/internal/cbus/Reader_test.go
b/plc4go/internal/cbus/Reader_test.go
index bb93973550..b5baf829e0 100644
--- a/plc4go/internal/cbus/Reader_test.go
+++ b/plc4go/internal/cbus/Reader_test.go
@@ -29,12 +29,12 @@ import (
"github.com/apache/plc4x/plc4go/spi/testutils"
"github.com/apache/plc4x/plc4go/spi/transactions"
"github.com/apache/plc4x/plc4go/spi/transports/test"
+ "github.com/apache/plc4x/plc4go/spi/utils"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"net/url"
"strings"
- "sync"
"sync/atomic"
"testing"
"time"
@@ -405,16 +405,15 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
ctx context.Context
transaction transactions.RequestTransaction
messageToSend readWriteModel.CBusMessage
- addResponseCode func(t *testing.T, wg *sync.WaitGroup)
func(name string, responseCode apiModel.PlcResponseCode)
+ addResponseCode func(t *testing.T) func(name string,
responseCode apiModel.PlcResponseCode)
tagName string
- addPlcValue func(t *testing.T, wg *sync.WaitGroup)
func(name string, plcValue apiValues.PlcValue)
+ addPlcValue func(t *testing.T) func(name string, plcValue
apiValues.PlcValue)
}
tests := []struct {
name string
fields fields
args args
- setup func(t *testing.T, fields *fields, args *args)
- wg *sync.WaitGroup
+ setup func(t *testing.T, fields *fields, args *args, ch chan
struct{})
}{
{
name: "Send message empty message",
@@ -428,23 +427,21 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
return timeout
}(),
messageToSend: nil,
- addResponseCode: func(t *testing.T, wg
*sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+ addResponseCode: func(t *testing.T) func(name
string, responseCode apiModel.PlcResponseCode) {
return func(name string, responseCode
apiModel.PlcResponseCode) {
t.Logf("Got response code %s
for %s", responseCode, name)
assert.Equal(t, "horst", name)
assert.Equal(t,
apiModel.PlcResponseCode_INTERNAL_ERROR, responseCode)
- wg.Done()
}
},
tagName: "horst",
- addPlcValue: func(t *testing.T, wg
*sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+ addPlcValue: func(t *testing.T) func(name
string, plcValue apiValues.PlcValue) {
return func(name string, plcValue
apiValues.PlcValue) {
t.Logf("Got response %s for
%s", plcValue, name)
- wg.Done()
}
},
},
- setup: func(t *testing.T, fields *fields, args *args) {
+ setup: func(t *testing.T, fields *fields, args *args,
ch chan struct{}) {
testutils.SetToTestingLogger(t,
readWriteModel.Plc4xModelLog)
loggerOption :=
options.WithCustomLogger(testutils.ProduceTestingLogger(t))
@@ -471,10 +468,11 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
transaction := NewMockRequestTransaction(t)
expect := transaction.EXPECT()
-
expect.FailRequest(mock.Anything).Return(errors.New("no I say"))
+
expect.FailRequest(mock.Anything).Return(errors.New("no I say")).Run(func(_
error) {
+ close(ch)
+ })
args.transaction = transaction
},
- wg: &sync.WaitGroup{},
},
{
name: "Send message which responds with message to
client",
@@ -543,31 +541,30 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
nil,
nil,
),
- addResponseCode: func(t *testing.T, wg
*sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+ addResponseCode: func(t *testing.T) func(name
string, responseCode apiModel.PlcResponseCode) {
return func(name string, responseCode
apiModel.PlcResponseCode) {
t.Logf("Got response code %s
for %s", responseCode, name)
assert.Equal(t, "horst", name)
assert.Equal(t,
apiModel.PlcResponseCode_REQUEST_TIMEOUT, responseCode)
- wg.Done()
}
},
tagName: "horst",
- addPlcValue: func(t *testing.T, wg
*sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+ addPlcValue: func(t *testing.T) func(name
string, plcValue apiValues.PlcValue) {
return func(name string, plcValue
apiValues.PlcValue) {
t.Logf("Got response %s for
%s", plcValue, name)
- wg.Done()
}
},
},
- setup: func(t *testing.T, fields *fields, args *args) {
+ setup: func(t *testing.T, fields *fields, args *args,
ch chan struct{}) {
testutils.SetToTestingLogger(t,
readWriteModel.Plc4xModelLog)
transaction := NewMockRequestTransaction(t)
expect := transaction.EXPECT()
-
expect.FailRequest(mock.Anything).Return(errors.New("Nope"))
+
expect.FailRequest(mock.Anything).Return(errors.New("Nope")).Run(func(_ error) {
+ close(ch)
+ })
args.transaction = transaction
},
- wg: &sync.WaitGroup{},
},
{
name: "Send message which responds with server error",
@@ -636,31 +633,30 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
nil,
nil,
),
- addResponseCode: func(t *testing.T, wg
*sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+ addResponseCode: func(t *testing.T) func(name
string, responseCode apiModel.PlcResponseCode) {
return func(name string, responseCode
apiModel.PlcResponseCode) {
t.Logf("Got response code %s
for %s", responseCode, name)
assert.Equal(t, "horst", name)
assert.Equal(t,
apiModel.PlcResponseCode_INVALID_DATA, responseCode)
- wg.Done()
}
},
tagName: "horst",
- addPlcValue: func(t *testing.T, wg
*sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+ addPlcValue: func(t *testing.T) func(name
string, plcValue apiValues.PlcValue) {
return func(name string, plcValue
apiValues.PlcValue) {
t.Logf("Got response %s for
%s", plcValue, name)
- wg.Done()
}
},
},
- setup: func(t *testing.T, fields *fields, args *args) {
+ setup: func(t *testing.T, fields *fields, args *args,
ch chan struct{}) {
testutils.SetToTestingLogger(t,
readWriteModel.Plc4xModelLog)
transaction := NewMockRequestTransaction(t)
expect := transaction.EXPECT()
- expect.EndRequest().Return(nil)
+ expect.EndRequest().Return(nil).Run(func() {
+ close(ch)
+ })
args.transaction = transaction
},
- wg: &sync.WaitGroup{},
},
{
name: "Send message which responds with too many
retransmissions",
@@ -692,26 +688,26 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
nil,
nil,
),
- addResponseCode: func(t *testing.T, wg
*sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+ addResponseCode: func(t *testing.T) func(name
string, responseCode apiModel.PlcResponseCode) {
return func(name string, responseCode
apiModel.PlcResponseCode) {
t.Logf("Got response code %s
for %s", responseCode, name)
assert.Equal(t, "horst", name)
assert.Equal(t,
apiModel.PlcResponseCode_REMOTE_ERROR, responseCode)
- wg.Done()
}
},
tagName: "horst",
- addPlcValue: func(t *testing.T, wg
*sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+ addPlcValue: func(t *testing.T) func(name
string, plcValue apiValues.PlcValue) {
return func(name string, plcValue
apiValues.PlcValue) {
t.Logf("Got response %s for
%s", plcValue, name)
- wg.Done()
}
},
},
- setup: func(t *testing.T, fields *fields, args *args) {
+ setup: func(t *testing.T, fields *fields, args *args,
ch chan struct{}) {
transaction := NewMockRequestTransaction(t)
expect := transaction.EXPECT()
- expect.EndRequest().Return(nil)
+ expect.EndRequest().Return(nil).Run(func() {
+ close(ch)
+ })
args.transaction = transaction
// Setup logger
@@ -759,7 +755,6 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
}
fields.messageCodec = codec
},
- wg: &sync.WaitGroup{},
},
{
name: "Send message which responds with corruption",
@@ -791,26 +786,26 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
nil,
nil,
),
- addResponseCode: func(t *testing.T, wg
*sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+ addResponseCode: func(t *testing.T) func(name
string, responseCode apiModel.PlcResponseCode) {
return func(name string, responseCode
apiModel.PlcResponseCode) {
t.Logf("Got response code %s
for %s", responseCode, name)
assert.Equal(t, "horst", name)
assert.Equal(t,
apiModel.PlcResponseCode_INVALID_DATA, responseCode)
- wg.Done()
}
},
tagName: "horst",
- addPlcValue: func(t *testing.T, wg
*sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+ addPlcValue: func(t *testing.T) func(name
string, plcValue apiValues.PlcValue) {
return func(name string, plcValue
apiValues.PlcValue) {
t.Logf("Got response %s for
%s", plcValue, name)
- wg.Done()
}
},
},
- setup: func(t *testing.T, fields *fields, args *args) {
+ setup: func(t *testing.T, fields *fields, args *args,
ch chan struct{}) {
transaction := NewMockRequestTransaction(t)
expect := transaction.EXPECT()
- expect.EndRequest().Return(nil)
+ expect.EndRequest().Return(nil).Run(func() {
+ close(ch)
+ })
args.transaction = transaction
// Setup logger
@@ -858,7 +853,6 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
}
fields.messageCodec = codec
},
- wg: &sync.WaitGroup{},
},
{
name: "Send message which responds with sync loss",
@@ -890,26 +884,26 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
nil,
nil,
),
- addResponseCode: func(t *testing.T, wg
*sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+ addResponseCode: func(t *testing.T) func(name
string, responseCode apiModel.PlcResponseCode) {
return func(name string, responseCode
apiModel.PlcResponseCode) {
t.Logf("Got response code %s
for %s", responseCode, name)
assert.Equal(t, "horst", name)
assert.Equal(t,
apiModel.PlcResponseCode_REMOTE_BUSY, responseCode)
- wg.Done()
}
},
tagName: "horst",
- addPlcValue: func(t *testing.T, wg
*sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+ addPlcValue: func(t *testing.T) func(name
string, plcValue apiValues.PlcValue) {
return func(name string, plcValue
apiValues.PlcValue) {
t.Logf("Got response %s for
%s", plcValue, name)
- wg.Done()
}
},
},
- setup: func(t *testing.T, fields *fields, args *args) {
+ setup: func(t *testing.T, fields *fields, args *args,
ch chan struct{}) {
transaction := NewMockRequestTransaction(t)
expect := transaction.EXPECT()
- expect.EndRequest().Return(nil)
+ expect.EndRequest().Return(nil).Run(func() {
+ close(ch)
+ })
args.transaction = transaction
// Setup logger
@@ -957,7 +951,6 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
}
fields.messageCodec = codec
},
- wg: &sync.WaitGroup{},
},
{
name: "Send message which responds with too long",
@@ -989,26 +982,26 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
nil,
nil,
),
- addResponseCode: func(t *testing.T, wg
*sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+ addResponseCode: func(t *testing.T) func(name
string, responseCode apiModel.PlcResponseCode) {
return func(name string, responseCode
apiModel.PlcResponseCode) {
t.Logf("Got response code %s
for %s", responseCode, name)
assert.Equal(t, "horst", name)
assert.Equal(t,
apiModel.PlcResponseCode_INVALID_DATA, responseCode)
- wg.Done()
}
},
tagName: "horst",
- addPlcValue: func(t *testing.T, wg
*sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+ addPlcValue: func(t *testing.T) func(name
string, plcValue apiValues.PlcValue) {
return func(name string, plcValue
apiValues.PlcValue) {
t.Logf("Got response %s for
%s", plcValue, name)
- wg.Done()
}
},
},
- setup: func(t *testing.T, fields *fields, args *args) {
+ setup: func(t *testing.T, fields *fields, args *args,
ch chan struct{}) {
transaction := NewMockRequestTransaction(t)
expect := transaction.EXPECT()
- expect.EndRequest().Return(nil)
+ expect.EndRequest().Return(nil).Run(func() {
+ close(ch)
+ })
args.transaction = transaction
// Setup logger
@@ -1056,7 +1049,6 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
}
fields.messageCodec = codec
},
- wg: &sync.WaitGroup{},
},
{
name: "Send message which responds with confirm only",
@@ -1088,26 +1080,26 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
nil,
nil,
),
- addResponseCode: func(t *testing.T, wg
*sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+ addResponseCode: func(t *testing.T) func(name
string, responseCode apiModel.PlcResponseCode) {
return func(name string, responseCode
apiModel.PlcResponseCode) {
t.Logf("Got response code %s
for %s", responseCode, name)
assert.Equal(t, "horst", name)
assert.Equal(t,
apiModel.PlcResponseCode_NOT_FOUND, responseCode)
- wg.Done()
}
},
tagName: "horst",
- addPlcValue: func(t *testing.T, wg
*sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+ addPlcValue: func(t *testing.T) func(name
string, plcValue apiValues.PlcValue) {
return func(name string, plcValue
apiValues.PlcValue) {
t.Logf("Got response %s for
%s", plcValue, name)
- wg.Done()
}
},
},
- setup: func(t *testing.T, fields *fields, args *args) {
+ setup: func(t *testing.T, fields *fields, args *args,
ch chan struct{}) {
transaction := NewMockRequestTransaction(t)
expect := transaction.EXPECT()
- expect.EndRequest().Return(nil)
+ expect.EndRequest().Return(nil).Run(func() {
+ close(ch)
+ })
args.transaction = transaction
// Setup logger
@@ -1155,7 +1147,6 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
}
fields.messageCodec = codec
},
- wg: &sync.WaitGroup{},
},
{
name: "Send message which responds with ok",
@@ -1187,26 +1178,26 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
nil,
nil,
),
- addResponseCode: func(t *testing.T, wg
*sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+ addResponseCode: func(t *testing.T) func(name
string, responseCode apiModel.PlcResponseCode) {
return func(name string, responseCode
apiModel.PlcResponseCode) {
t.Logf("Got response code %s
for %s", responseCode, name)
assert.Equal(t, "horst", name)
assert.Equal(t,
apiModel.PlcResponseCode_OK, responseCode)
- wg.Done()
}
},
tagName: "horst",
- addPlcValue: func(t *testing.T, wg
*sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+ addPlcValue: func(t *testing.T) func(name
string, plcValue apiValues.PlcValue) {
return func(name string, plcValue
apiValues.PlcValue) {
t.Logf("Got response %s for
%s", plcValue, name)
- wg.Done()
}
},
},
- setup: func(t *testing.T, fields *fields, args *args) {
+ setup: func(t *testing.T, fields *fields, args *args,
ch chan struct{}) {
transaction := NewMockRequestTransaction(t)
expect := transaction.EXPECT()
- expect.EndRequest().Return(nil)
+ expect.EndRequest().Return(nil).Run(func() {
+ close(ch)
+ })
args.transaction = transaction
// Setup logger
@@ -1254,28 +1245,29 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
}
fields.messageCodec = codec
},
- wg: func() *sync.WaitGroup {
- wg := &sync.WaitGroup{}
- wg.Add(1) // We getting an response and a value
- return wg
- }(),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ ch := make(chan struct{})
if tt.setup != nil {
- tt.setup(t, &tt.fields, &tt.args)
+ tt.setup(t, &tt.fields, &tt.args, ch)
}
m := &Reader{
alphaGenerator: tt.fields.alphaGenerator,
messageCodec: tt.fields.messageCodec,
tm: tt.fields.tm,
}
- tt.wg.Add(1)
- m.sendMessageOverTheWire(tt.args.ctx,
tt.args.transaction, tt.args.messageToSend, tt.args.addResponseCode(t, tt.wg),
tt.args.tagName, tt.args.addPlcValue(t, tt.wg))
+ m.sendMessageOverTheWire(tt.args.ctx,
tt.args.transaction, tt.args.messageToSend, tt.args.addResponseCode(t),
tt.args.tagName, tt.args.addPlcValue(t))
t.Log("Waiting now")
- tt.wg.Wait() // TODO: we need to timeout this too
- t.Log("Done waiting")
+ timer := time.NewTimer(3 * time.Second)
+ defer utils.CleanupTimer(timer)
+ select {
+ case <-ch:
+ t.Log("Done waiting")
+ case <-timer.C:
+ t.Error("Timeout")
+ }
})
}
}