This is an automated email from the ASF dual-hosted git repository. sruehl pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 749920c4e67e335beeb0361c70f47b6b7b8b632c Author: Sebastian Rühl <[email protected]> AuthorDate: Fri Nov 14 10:03:05 2025 +0100 feat(plc4go/spi): expectation cancellation --- plc4go/internal/simulated/Connection.go | 2 +- plc4go/internal/simulated/Driver.go | 3 +- plc4go/spi/MessageCodec.go | 1 + plc4go/spi/default/DefaultCodec.go | 8 +- plc4go/spi/default/DefaultCodec_test.go | 215 +++++++++++++++++++-------- plc4go/spi/default/DefaultConnection.go | 6 +- plc4go/spi/default/DefaultConnection_test.go | 12 +- plc4go/spi/default/defaultExpectation.go | 19 ++- plc4go/spi/default/mocks_test.go | 102 +++++++++---- plc4go/spi/mocks_test.go | 40 +++++ plc4go/spi/tracer/Tracer.go | 2 +- plc4go/spi/utils/IdGenerator.go | 12 +- plc4go/spi/utils/IdGenerator_test.go | 2 +- 13 files changed, 299 insertions(+), 125 deletions(-) diff --git a/plc4go/internal/simulated/Connection.go b/plc4go/internal/simulated/Connection.go index 948dc777b4..ae346a962a 100644 --- a/plc4go/internal/simulated/Connection.go +++ b/plc4go/internal/simulated/Connection.go @@ -61,7 +61,7 @@ func NewConnection(device *Device, tagHandler spi.PlcTagHandler, valueHandler sp valueHandler: valueHandler, options: connectionOptions, connected: false, - connectionId: utils.GenerateId(customLogger, 4), + connectionId: utils.GenerateId(4), log: customLogger, } diff --git a/plc4go/internal/simulated/Driver.go b/plc4go/internal/simulated/Driver.go index dfdb504dce..4365ab4ce9 100644 --- a/plc4go/internal/simulated/Driver.go +++ b/plc4go/internal/simulated/Driver.go @@ -63,10 +63,11 @@ func (d *Driver) GetConnection(ctx context.Context, _ url.URL, _ map[string]tran driverOptions, append(d._options, options.WithCustomLogger(d.log))..., ) - d.log.Debug().Stringer("connection", connection).Msg("Connecting and returning connection") + d.log.Trace().Stringer("connection", connection).Msg("Connecting") if err := connection.Connect(ctx); err != nil { return nil, errors.Wrap(err, "Error connecting connection") } + d.log.Trace().Stringer("connection", connection).Msg("Connected") return connection, nil } diff --git a/plc4go/spi/MessageCodec.go b/plc4go/spi/MessageCodec.go index f8c74de2ac..279e75a048 100644 --- a/plc4go/spi/MessageCodec.go +++ b/plc4go/spi/MessageCodec.go @@ -28,6 +28,7 @@ import ( type Expectation interface { fmt.Stringer GetContext() context.Context + Cancel(cause error) GetCreationTime() time.Time GetExpiration() time.Time GetAcceptsMessage() AcceptsMessage diff --git a/plc4go/spi/default/DefaultCodec.go b/plc4go/spi/default/DefaultCodec.go index a690d8adf9..7462effefb 100644 --- a/plc4go/spi/default/DefaultCodec.go +++ b/plc4go/spi/default/DefaultCodec.go @@ -192,7 +192,9 @@ func (m *defaultCodec) Disconnect() error { } for _, expectation := range m.expectations { m.wg.Go(func() { - _ = expectation.GetHandleError()(errors.New("disconnected")) + err := errors.New("disconnected") + expectation.Cancel(err) + _ = expectation.GetHandleError()(err) }) } m.wg.Wait() @@ -243,7 +245,9 @@ func (m *defaultCodec) TimeoutExpectations(now time.Time) time.Duration { m.log.Debug().Stringer("expectation", expectation).Msg("timeout expectation") // Call the error handler. m.wg.Go(func() { - if err := expectation.GetHandleError()(utils.NewTimeoutError(expectation.GetExpiration().Sub(expectation.GetCreationTime()))); err != nil { + timeoutErr := utils.NewTimeoutError(expectation.GetExpiration().Sub(expectation.GetCreationTime())) + expectation.Cancel(timeoutErr) + if err := expectation.GetHandleError()(timeoutErr); err != nil { m.log.Error().Err(err).Msg("Got an error handling error on expectation") } }) diff --git a/plc4go/spi/default/DefaultCodec_test.go b/plc4go/spi/default/DefaultCodec_test.go index dcb0114223..18d5db00c3 100644 --- a/plc4go/spi/default/DefaultCodec_test.go +++ b/plc4go/spi/default/DefaultCodec_test.go @@ -40,7 +40,8 @@ import ( func TestDefaultExpectation_GetAcceptsMessage(t *testing.T) { type fields struct { - Context context.Context + Ctx context.Context + CancelFunc context.CancelCauseFunc Expiration time.Time AcceptsMessage spi.AcceptsMessage HandleMessage spi.HandleMessage @@ -62,7 +63,8 @@ func TestDefaultExpectation_GetAcceptsMessage(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { m := &defaultExpectation{ - Context: tt.fields.Context, + Ctx: tt.fields.Ctx, + CancelFunc: tt.fields.CancelFunc, Expiration: tt.fields.Expiration, AcceptsMessage: tt.fields.AcceptsMessage, HandleMessage: tt.fields.HandleMessage, @@ -75,7 +77,8 @@ func TestDefaultExpectation_GetAcceptsMessage(t *testing.T) { func TestDefaultExpectation_GetContext(t *testing.T) { type fields struct { - Context context.Context + Ctx context.Context + CancelFunc context.CancelCauseFunc Expiration time.Time AcceptsMessage spi.AcceptsMessage HandleMessage spi.HandleMessage @@ -93,7 +96,8 @@ func TestDefaultExpectation_GetContext(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { m := &defaultExpectation{ - Context: tt.fields.Context, + Ctx: tt.fields.Ctx, + CancelFunc: tt.fields.CancelFunc, Expiration: tt.fields.Expiration, AcceptsMessage: tt.fields.AcceptsMessage, HandleMessage: tt.fields.HandleMessage, @@ -106,7 +110,8 @@ func TestDefaultExpectation_GetContext(t *testing.T) { func TestDefaultExpectation_GetExpiration(t *testing.T) { type fields struct { - Context context.Context + Ctx context.Context + CancelFunc context.CancelCauseFunc Expiration time.Time AcceptsMessage spi.AcceptsMessage HandleMessage spi.HandleMessage @@ -124,7 +129,8 @@ func TestDefaultExpectation_GetExpiration(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { m := &defaultExpectation{ - Context: tt.fields.Context, + Ctx: tt.fields.Ctx, + CancelFunc: tt.fields.CancelFunc, Expiration: tt.fields.Expiration, AcceptsMessage: tt.fields.AcceptsMessage, HandleMessage: tt.fields.HandleMessage, @@ -137,7 +143,8 @@ func TestDefaultExpectation_GetExpiration(t *testing.T) { func TestDefaultExpectation_GetHandleError(t *testing.T) { type fields struct { - Context context.Context + Ctx context.Context + CancelFunc context.CancelCauseFunc Expiration time.Time AcceptsMessage spi.AcceptsMessage HandleMessage spi.HandleMessage @@ -159,7 +166,8 @@ func TestDefaultExpectation_GetHandleError(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { m := &defaultExpectation{ - Context: tt.fields.Context, + Ctx: tt.fields.Ctx, + CancelFunc: tt.fields.CancelFunc, Expiration: tt.fields.Expiration, AcceptsMessage: tt.fields.AcceptsMessage, HandleMessage: tt.fields.HandleMessage, @@ -172,7 +180,8 @@ func TestDefaultExpectation_GetHandleError(t *testing.T) { func TestDefaultExpectation_GetHandleMessage(t *testing.T) { type fields struct { - Context context.Context + Ctx context.Context + CancelFunc context.CancelCauseFunc Expiration time.Time AcceptsMessage spi.AcceptsMessage HandleMessage spi.HandleMessage @@ -194,7 +203,8 @@ func TestDefaultExpectation_GetHandleMessage(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { m := &defaultExpectation{ - Context: tt.fields.Context, + Ctx: tt.fields.Ctx, + CancelFunc: tt.fields.CancelFunc, Expiration: tt.fields.Expiration, AcceptsMessage: tt.fields.AcceptsMessage, HandleMessage: tt.fields.HandleMessage, @@ -207,7 +217,8 @@ func TestDefaultExpectation_GetHandleMessage(t *testing.T) { func TestDefaultExpectation_String(t *testing.T) { type fields struct { - Context context.Context + Ctx context.Context + CancelFunc context.CancelCauseFunc Expiration time.Time AcceptsMessage spi.AcceptsMessage HandleMessage spi.HandleMessage @@ -226,7 +237,8 @@ func TestDefaultExpectation_String(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { m := &defaultExpectation{ - Context: tt.fields.Context, + Ctx: tt.fields.Ctx, + CancelFunc: tt.fields.CancelFunc, Expiration: tt.fields.Expiration, AcceptsMessage: tt.fields.AcceptsMessage, HandleMessage: tt.fields.HandleMessage, @@ -613,13 +625,17 @@ func Test_defaultCodec_HandleMessages(t *testing.T) { fields: fields{ expectations: []spi.Expectation{ &defaultExpectation{ // doesn't accept - uuid: uuid.New(), + Uuid: uuid.New(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, AcceptsMessage: func(_ spi.Message) bool { return false }, }, &defaultExpectation{ // accepts but fails - uuid: uuid.New(), + Uuid: uuid.New(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, AcceptsMessage: func(_ spi.Message) bool { return true }, @@ -631,7 +647,9 @@ func Test_defaultCodec_HandleMessages(t *testing.T) { }, }, &defaultExpectation{ // accepts but fails and fails to handle the error - uuid: uuid.New(), + Uuid: uuid.New(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, AcceptsMessage: func(_ spi.Message) bool { return true }, @@ -643,7 +661,9 @@ func Test_defaultCodec_HandleMessages(t *testing.T) { }, }, &defaultExpectation{ // accepts - uuid: uuid.New(), + Uuid: uuid.New(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, AcceptsMessage: func(_ spi.Message) bool { return true }, @@ -652,7 +672,9 @@ func Test_defaultCodec_HandleMessages(t *testing.T) { }, }, &defaultExpectation{ // accepts - uuid: uuid.New(), + Uuid: uuid.New(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, AcceptsMessage: func(_ spi.Message) bool { return true }, @@ -661,7 +683,9 @@ func Test_defaultCodec_HandleMessages(t *testing.T) { }, }, &defaultExpectation{ // accepts - uuid: uuid.New(), + Uuid: uuid.New(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, AcceptsMessage: func(_ spi.Message) bool { return true }, @@ -670,7 +694,9 @@ func Test_defaultCodec_HandleMessages(t *testing.T) { }, }, &defaultExpectation{ // not accept - uuid: uuid.New(), + Uuid: uuid.New(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, AcceptsMessage: func(_ spi.Message) bool { return false }, @@ -679,7 +705,9 @@ func Test_defaultCodec_HandleMessages(t *testing.T) { }, }, &defaultExpectation{ // accepts - uuid: uuid.New(), + Uuid: uuid.New(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, AcceptsMessage: func(_ spi.Message) bool { return true }, @@ -710,13 +738,17 @@ func Test_defaultCodec_HandleMessages(t *testing.T) { }) fields.expectations = []spi.Expectation{ &defaultExpectation{ // doesn't accept - uuid: uuid.New(), + Uuid: uuid.New(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, AcceptsMessage: func(_ spi.Message) bool { return false }, }, &defaultExpectation{ // accepts but fails // accept1 - uuid: uuid.New(), + Uuid: uuid.New(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, AcceptsMessage: func(_ spi.Message) bool { return true }, @@ -729,7 +761,9 @@ func Test_defaultCodec_HandleMessages(t *testing.T) { }, }, &defaultExpectation{ // accepts but fails and fails to handle the error // accept2 - uuid: uuid.New(), + Uuid: uuid.New(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, AcceptsMessage: func(_ spi.Message) bool { return true }, @@ -742,7 +776,9 @@ func Test_defaultCodec_HandleMessages(t *testing.T) { }, }, &defaultExpectation{ // accepts // accept3 - uuid: uuid.New(), + Uuid: uuid.New(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, AcceptsMessage: func(_ spi.Message) bool { return true }, @@ -752,7 +788,9 @@ func Test_defaultCodec_HandleMessages(t *testing.T) { }, }, &defaultExpectation{ // accepts // accept4 - uuid: uuid.New(), + Uuid: uuid.New(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, AcceptsMessage: func(_ spi.Message) bool { return true }, @@ -762,7 +800,9 @@ func Test_defaultCodec_HandleMessages(t *testing.T) { }, }, &defaultExpectation{ // not accept // accept5 - uuid: uuid.New(), + Uuid: uuid.New(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, AcceptsMessage: func(_ spi.Message) bool { return true }, @@ -772,7 +812,9 @@ func Test_defaultCodec_HandleMessages(t *testing.T) { }, }, &defaultExpectation{ // not accept - uuid: uuid.New(), + Uuid: uuid.New(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, AcceptsMessage: func(_ spi.Message) bool { return false }, @@ -781,7 +823,9 @@ func Test_defaultCodec_HandleMessages(t *testing.T) { }, }, &defaultExpectation{ // accepts // accept6 - uuid: uuid.New(), + Uuid: uuid.New(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, AcceptsMessage: func(_ spi.Message) bool { return true }, @@ -962,30 +1006,34 @@ func Test_defaultCodec_TimeoutExpectations(t *testing.T) { fields: fields{ expectations: []spi.Expectation{ &defaultExpectation{ // Expired - Context: t.Context(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return nil }, }, &defaultExpectation{ // Expired errors - Context: t.Context(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return errors.New("yep") }, }, &defaultExpectation{ // Fine - Context: t.Context(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return errors.New("yep") }, Expiration: time.Time{}.Add(3 * time.Hour), }, &defaultExpectation{ // Context error - Context: func() context.Context { + Ctx: func() context.Context { ctx, cancelFunc := context.WithCancel(t.Context()) cancelFunc() // Cancel it instantly return ctx }(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return errors.New("yep") }, @@ -1015,21 +1063,24 @@ func Test_defaultCodec_TimeoutExpectations(t *testing.T) { }) fields.expectations = []spi.Expectation{ &defaultExpectation{ // Expired - Context: t.Context(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { handle1.Store(true) return nil }, }, &defaultExpectation{ // Expired errors - Context: t.Context(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { handle2.Store(true) return errors.New("yep") }, }, &defaultExpectation{ // Fine - Context: t.Context(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { handle3.Store(true) return errors.New("yep") @@ -1037,11 +1088,12 @@ func Test_defaultCodec_TimeoutExpectations(t *testing.T) { Expiration: time.Time{}.Add(3 * time.Hour), }, &defaultExpectation{ // Context error - Context: func() context.Context { + Ctx: func() context.Context { ctx, cancelFunc := context.WithCancel(t.Context()) cancelFunc() // Cancel it instantly return ctx }(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { handle4.Store(true) return errors.New("yep") @@ -1049,7 +1101,8 @@ func Test_defaultCodec_TimeoutExpectations(t *testing.T) { Expiration: time.Time{}.Add(3 * time.Hour), }, &defaultExpectation{ // Fine - Context: t.Context(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { handle5.Store(true) return errors.New("yep") @@ -1090,6 +1143,8 @@ func Test_defaultCodec_ReceiveWork(t *testing.T) { expectations []spi.Expectation running bool customMessageHandling CustomMessageHandler + ctx context.Context + ctxCancel context.CancelFunc } tests := []struct { name string @@ -1106,6 +1161,7 @@ func Test_defaultCodec_ReceiveWork(t *testing.T) { setup: func(t *testing.T, fields *fields) { requirements := NewMockDefaultCodecRequirements(t) fields.DefaultCodecRequirements = requirements + fields.ctx, fields.ctxCancel = context.WithTimeout(t.Context(), 2*time.Second) }, }, { @@ -1113,30 +1169,34 @@ func Test_defaultCodec_ReceiveWork(t *testing.T) { fields: fields{ expectations: []spi.Expectation{ &defaultExpectation{ // Expired - Context: t.Context(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return nil }, }, &defaultExpectation{ // Expired errors - Context: t.Context(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return errors.New("yep") }, }, &defaultExpectation{ // Fine - Context: t.Context(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return errors.New("yep") }, Expiration: time.Time{}.Add(3 * time.Hour), }, &defaultExpectation{ // Context error - Context: func() context.Context { + Ctx: func() context.Context { ctx, cancelFunc := context.WithCancel(t.Context()) cancelFunc() // Cancel it instantly return ctx }(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return errors.New("yep") }, @@ -1148,6 +1208,7 @@ func Test_defaultCodec_ReceiveWork(t *testing.T) { requirements := NewMockDefaultCodecRequirements(t) requirements.EXPECT().Receive(mock.Anything).Return(nil, errors.New("nope")) fields.DefaultCodecRequirements = requirements + fields.ctx, fields.ctxCancel = context.WithTimeout(t.Context(), 2*time.Second) }, manipulator: func(t *testing.T, codec *defaultCodec) { codec.running.Store(true) @@ -1159,30 +1220,34 @@ func Test_defaultCodec_ReceiveWork(t *testing.T) { fields: fields{ expectations: []spi.Expectation{ &defaultExpectation{ // Expired - Context: t.Context(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return nil }, }, &defaultExpectation{ // Expired errors - Context: t.Context(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return errors.New("yep") }, }, &defaultExpectation{ // Fine - Context: t.Context(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return errors.New("yep") }, Expiration: time.Time{}.Add(3 * time.Hour), }, &defaultExpectation{ // Context error - Context: func() context.Context { + Ctx: func() context.Context { ctx, cancelFunc := context.WithCancel(t.Context()) cancelFunc() // Cancel it instantly return ctx }(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return errors.New("yep") }, @@ -1194,6 +1259,7 @@ func Test_defaultCodec_ReceiveWork(t *testing.T) { requirements := NewMockDefaultCodecRequirements(t) requirements.EXPECT().Receive(mock.Anything).Return(nil, nil) fields.DefaultCodecRequirements = requirements + fields.ctx, fields.ctxCancel = context.WithTimeout(t.Context(), 2*time.Second) }, manipulator: func(t *testing.T, codec *defaultCodec) { codec.running.Store(true) @@ -1205,30 +1271,34 @@ func Test_defaultCodec_ReceiveWork(t *testing.T) { fields: fields{ expectations: []spi.Expectation{ &defaultExpectation{ // Expired - Context: t.Context(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return nil }, }, &defaultExpectation{ // Expired errors - Context: t.Context(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return errors.New("yep") }, }, &defaultExpectation{ // Fine - Context: t.Context(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return errors.New("yep") }, Expiration: time.Time{}.Add(3 * time.Hour), }, &defaultExpectation{ // Context error - Context: func() context.Context { + Ctx: func() context.Context { ctx, cancelFunc := context.WithCancel(t.Context()) cancelFunc() // Cancel it instantly return ctx }(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return errors.New("yep") }, @@ -1240,6 +1310,7 @@ func Test_defaultCodec_ReceiveWork(t *testing.T) { requirements := NewMockDefaultCodecRequirements(t) requirements.EXPECT().Receive(mock.Anything).Return(NewMockMessage(t), nil) fields.DefaultCodecRequirements = requirements + fields.ctx, fields.ctxCancel = context.WithTimeout(t.Context(), 2*time.Second) }, manipulator: func(t *testing.T, codec *defaultCodec) { codec.running.Store(true) @@ -1252,7 +1323,8 @@ func Test_defaultCodec_ReceiveWork(t *testing.T) { defaultIncomingMessageChannel: make(chan spi.Message, 1), expectations: []spi.Expectation{ &defaultExpectation{ // Fine - Context: t.Context(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return errors.New("yep") }, @@ -1264,6 +1336,7 @@ func Test_defaultCodec_ReceiveWork(t *testing.T) { requirements := NewMockDefaultCodecRequirements(t) requirements.EXPECT().Receive(mock.Anything).Return(NewMockMessage(t), nil) fields.DefaultCodecRequirements = requirements + fields.ctx, fields.ctxCancel = context.WithTimeout(t.Context(), 2*time.Second) }, manipulator: func(t *testing.T, codec *defaultCodec) { codec.running.Store(true) @@ -1275,30 +1348,34 @@ func Test_defaultCodec_ReceiveWork(t *testing.T) { fields: fields{ expectations: []spi.Expectation{ &defaultExpectation{ // Expired - Context: t.Context(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return nil }, }, &defaultExpectation{ // Expired errors - Context: t.Context(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return errors.New("yep") }, }, &defaultExpectation{ // Fine - Context: t.Context(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return errors.New("yep") }, Expiration: time.Time{}.Add(3 * time.Hour), }, &defaultExpectation{ // Context error - Context: func() context.Context { + Ctx: func() context.Context { ctx, cancelFunc := context.WithCancel(t.Context()) cancelFunc() // Cancel it instantly return ctx }(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return errors.New("yep") }, @@ -1310,6 +1387,7 @@ func Test_defaultCodec_ReceiveWork(t *testing.T) { requirements := NewMockDefaultCodecRequirements(t) requirements.EXPECT().Receive(mock.Anything).Return(nil, errors.New("nope")) fields.DefaultCodecRequirements = requirements + fields.ctx, fields.ctxCancel = context.WithTimeout(t.Context(), 2*time.Second) }, manipulator: func(t *testing.T, codec *defaultCodec) { codec.running.Store(true) @@ -1324,30 +1402,34 @@ func Test_defaultCodec_ReceiveWork(t *testing.T) { }, expectations: []spi.Expectation{ &defaultExpectation{ // Expired - Context: t.Context(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return nil }, }, &defaultExpectation{ // Expired errors - Context: t.Context(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return errors.New("yep") }, }, &defaultExpectation{ // Fine - Context: t.Context(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return errors.New("yep") }, Expiration: time.Time{}.Add(3 * time.Hour), }, &defaultExpectation{ // Context error - Context: func() context.Context { + Ctx: func() context.Context { ctx, cancelFunc := context.WithCancel(t.Context()) cancelFunc() // Cancel it instantly return ctx }(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return errors.New("yep") }, @@ -1359,6 +1441,7 @@ func Test_defaultCodec_ReceiveWork(t *testing.T) { requirements := NewMockDefaultCodecRequirements(t) requirements.EXPECT().Receive(mock.Anything).Return(NewMockMessage(t), nil) fields.DefaultCodecRequirements = requirements + fields.ctx, fields.ctxCancel = context.WithTimeout(t.Context(), 2*time.Second) }, manipulator: func(t *testing.T, codec *defaultCodec) { codec.running.Store(true) @@ -1373,30 +1456,34 @@ func Test_defaultCodec_ReceiveWork(t *testing.T) { }, expectations: []spi.Expectation{ &defaultExpectation{ // Expired - Context: t.Context(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return nil }, }, &defaultExpectation{ // Expired errors - Context: t.Context(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return errors.New("yep") }, }, &defaultExpectation{ // Fine - Context: t.Context(), + Ctx: t.Context(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return errors.New("yep") }, Expiration: time.Time{}.Add(3 * time.Hour), }, &defaultExpectation{ // Context error - Context: func() context.Context { + Ctx: func() context.Context { ctx, cancelFunc := context.WithCancel(t.Context()) cancelFunc() // Cancel it instantly return ctx }(), + CancelFunc: func(_ error) {}, HandleError: func(err error) error { return errors.New("yep") }, @@ -1408,6 +1495,7 @@ func Test_defaultCodec_ReceiveWork(t *testing.T) { requirements := NewMockDefaultCodecRequirements(t) requirements.EXPECT().Receive(mock.Anything).Return(NewMockMessage(t), nil) fields.DefaultCodecRequirements = requirements + fields.ctx, fields.ctxCancel = context.WithTimeout(t.Context(), 2*time.Second) }, manipulator: func(t *testing.T, codec *defaultCodec) { codec.running.Store(true) @@ -1428,9 +1516,10 @@ func Test_defaultCodec_ReceiveWork(t *testing.T) { notifyExpireWorker: make(chan struct{}, 100), notifyReceiveWorker: make(chan struct{}, 100), customMessageHandling: tt.fields.customMessageHandling, + ctx: tt.fields.ctx, + ctxCancel: tt.fields.ctxCancel, log: testutils.ProduceTestingLogger(t), } - m.ctx, m.ctxCancel = context.WithCancel(t.Context()) if tt.manipulator != nil { tt.manipulator(t, m) } diff --git a/plc4go/spi/default/DefaultConnection.go b/plc4go/spi/default/DefaultConnection.go index 615c8da4b3..0400c45899 100644 --- a/plc4go/spi/default/DefaultConnection.go +++ b/plc4go/spi/default/DefaultConnection.go @@ -39,8 +39,8 @@ import ( // DefaultConnectionRequirements defines the required at a implementing connection when using DefaultConnection // additional options can be set using the functions returning WithOption (e.g. WithDefaultTtl, WithPlcTagHandler...) type DefaultConnectionRequirements interface { - // GetConnection should return the implementing connection when using DefaultConnection - GetConnection() plc4go.PlcConnection + // IsConnected should return the implementing connection check when using DefaultConnection + IsConnected() bool // GetMessageCodec should return the spi.MessageCodec in use GetMessageCodec() spi.MessageCodec } @@ -167,7 +167,7 @@ func (d *defaultConnection) Ping() <-chan plc4go.PlcConnectionPingResult { ch <- NewDefaultPlcConnectionPingResult(errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack())) } }() - if d.GetConnection().IsConnected() { + if d.DefaultConnectionRequirements.IsConnected() { ch <- NewDefaultPlcConnectionPingResult(nil) } else { ch <- NewDefaultPlcConnectionPingResult(errors.New("not connected")) diff --git a/plc4go/spi/default/DefaultConnection_test.go b/plc4go/spi/default/DefaultConnection_test.go index ae262d99fd..5f58f4e516 100644 --- a/plc4go/spi/default/DefaultConnection_test.go +++ b/plc4go/spi/default/DefaultConnection_test.go @@ -340,9 +340,6 @@ func Test_defaultConnection_Close(t *testing.T) { name: "close", setup: func(t *testing.T, fields *fields) { requirements := NewMockDefaultConnectionRequirements(t) - connection := NewMockPlcConnection(t) - connection.EXPECT().Close().Return(nil) - requirements.EXPECT().GetConnection().Return(connection) requirements.EXPECT().GetMessageCodec().Return(nil) fields.DefaultConnectionRequirements = requirements }, @@ -432,7 +429,6 @@ func Test_defaultConnection_Connect(t *testing.T) { } expect := requirements.EXPECT() expect.GetMessageCodec().Return(codec) - expect.GetConnection().Return(NewMockPlcConnection(t)) fields.DefaultConnectionRequirements = requirements }, wantErr: assert.NoError, @@ -623,11 +619,9 @@ func Test_defaultConnection_Ping(t *testing.T) { name: "ping it", setup: func(t *testing.T, fields *fields) { requirements := NewMockDefaultConnectionRequirements(t) - connection := NewMockPlcConnection(t) { - connection.EXPECT().IsConnected().Return(false) + requirements.EXPECT().IsConnected().Return(false) } - requirements.EXPECT().GetConnection().Return(connection) fields.DefaultConnectionRequirements = requirements }, wantAsserter: func(t *testing.T, results <-chan plc4go.PlcConnectionPingResult) bool { @@ -644,11 +638,9 @@ func Test_defaultConnection_Ping(t *testing.T) { name: "ping it connected", setup: func(t *testing.T, fields *fields) { requirements := NewMockDefaultConnectionRequirements(t) - connection := NewMockPlcConnection(t) { - connection.EXPECT().IsConnected().Return(true) + requirements.EXPECT().IsConnected().Return(true) } - requirements.EXPECT().GetConnection().Return(connection) fields.DefaultConnectionRequirements = requirements }, connected: true, diff --git a/plc4go/spi/default/defaultExpectation.go b/plc4go/spi/default/defaultExpectation.go index 156bb92b8c..db9af30d17 100644 --- a/plc4go/spi/default/defaultExpectation.go +++ b/plc4go/spi/default/defaultExpectation.go @@ -30,8 +30,9 @@ import ( ) type defaultExpectation struct { - uuid uuid.UUID - Context context.Context + Uuid uuid.UUID + Ctx context.Context + CancelFunc context.CancelCauseFunc CreationTime time.Time Expiration time.Time AcceptsMessage spi.AcceptsMessage @@ -40,9 +41,11 @@ type defaultExpectation struct { } func newDefaultExpectation(ctx context.Context, ttl time.Duration, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError) *defaultExpectation { + ctx, cancelFunc := context.WithCancelCause(ctx) return &defaultExpectation{ - uuid: uuid.New(), - Context: ctx, + Uuid: uuid.New(), + Ctx: ctx, + CancelFunc: cancelFunc, CreationTime: time.Now(), Expiration: time.Now().Add(ttl), AcceptsMessage: acceptsMessage, @@ -52,7 +55,11 @@ func newDefaultExpectation(ctx context.Context, ttl time.Duration, acceptsMessag } func (d *defaultExpectation) GetContext() context.Context { - return d.Context + return d.Ctx +} + +func (d *defaultExpectation) Cancel(cause error) { + d.CancelFunc(cause) } func (d *defaultExpectation) GetCreationTime() time.Time { @@ -76,5 +83,5 @@ func (d *defaultExpectation) GetHandleError() spi.HandleError { } func (d *defaultExpectation) String() string { - return fmt.Sprintf("Expectation %s (expires at %v in %s)", d.uuid, d.Expiration, time.Until(d.Expiration)) + return fmt.Sprintf("Expectation %s (expires at %v in %s)", d.Uuid, d.Expiration, time.Until(d.Expiration)) } diff --git a/plc4go/spi/default/mocks_test.go b/plc4go/spi/default/mocks_test.go index 2968c43520..ac37d69acc 100644 --- a/plc4go/spi/default/mocks_test.go +++ b/plc4go/spi/default/mocks_test.go @@ -1074,94 +1074,92 @@ func (_m *MockDefaultConnectionRequirements) EXPECT() *MockDefaultConnectionRequ return &MockDefaultConnectionRequirements_Expecter{mock: &_m.Mock} } -// GetConnection provides a mock function for the type MockDefaultConnectionRequirements -func (_mock *MockDefaultConnectionRequirements) GetConnection() plc4go.PlcConnection { +// GetMessageCodec provides a mock function for the type MockDefaultConnectionRequirements +func (_mock *MockDefaultConnectionRequirements) GetMessageCodec() spi.MessageCodec { ret := _mock.Called() if len(ret) == 0 { - panic("no return value specified for GetConnection") + panic("no return value specified for GetMessageCodec") } - var r0 plc4go.PlcConnection - if returnFunc, ok := ret.Get(0).(func() plc4go.PlcConnection); ok { + var r0 spi.MessageCodec + if returnFunc, ok := ret.Get(0).(func() spi.MessageCodec); ok { r0 = returnFunc() } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(plc4go.PlcConnection) + r0 = ret.Get(0).(spi.MessageCodec) } } return r0 } -// MockDefaultConnectionRequirements_GetConnection_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetConnection' -type MockDefaultConnectionRequirements_GetConnection_Call struct { +// MockDefaultConnectionRequirements_GetMessageCodec_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetMessageCodec' +type MockDefaultConnectionRequirements_GetMessageCodec_Call struct { *mock.Call } -// GetConnection is a helper method to define mock.On call -func (_e *MockDefaultConnectionRequirements_Expecter) GetConnection() *MockDefaultConnectionRequirements_GetConnection_Call { - return &MockDefaultConnectionRequirements_GetConnection_Call{Call: _e.mock.On("GetConnection")} +// GetMessageCodec is a helper method to define mock.On call +func (_e *MockDefaultConnectionRequirements_Expecter) GetMessageCodec() *MockDefaultConnectionRequirements_GetMessageCodec_Call { + return &MockDefaultConnectionRequirements_GetMessageCodec_Call{Call: _e.mock.On("GetMessageCodec")} } -func (_c *MockDefaultConnectionRequirements_GetConnection_Call) Run(run func()) *MockDefaultConnectionRequirements_GetConnection_Call { +func (_c *MockDefaultConnectionRequirements_GetMessageCodec_Call) Run(run func()) *MockDefaultConnectionRequirements_GetMessageCodec_Call { _c.Call.Run(func(args mock.Arguments) { run() }) return _c } -func (_c *MockDefaultConnectionRequirements_GetConnection_Call) Return(plcConnection plc4go.PlcConnection) *MockDefaultConnectionRequirements_GetConnection_Call { - _c.Call.Return(plcConnection) +func (_c *MockDefaultConnectionRequirements_GetMessageCodec_Call) Return(messageCodec spi.MessageCodec) *MockDefaultConnectionRequirements_GetMessageCodec_Call { + _c.Call.Return(messageCodec) return _c } -func (_c *MockDefaultConnectionRequirements_GetConnection_Call) RunAndReturn(run func() plc4go.PlcConnection) *MockDefaultConnectionRequirements_GetConnection_Call { +func (_c *MockDefaultConnectionRequirements_GetMessageCodec_Call) RunAndReturn(run func() spi.MessageCodec) *MockDefaultConnectionRequirements_GetMessageCodec_Call { _c.Call.Return(run) return _c } -// GetMessageCodec provides a mock function for the type MockDefaultConnectionRequirements -func (_mock *MockDefaultConnectionRequirements) GetMessageCodec() spi.MessageCodec { +// IsConnected provides a mock function for the type MockDefaultConnectionRequirements +func (_mock *MockDefaultConnectionRequirements) IsConnected() bool { ret := _mock.Called() if len(ret) == 0 { - panic("no return value specified for GetMessageCodec") + panic("no return value specified for IsConnected") } - var r0 spi.MessageCodec - if returnFunc, ok := ret.Get(0).(func() spi.MessageCodec); ok { + var r0 bool + if returnFunc, ok := ret.Get(0).(func() bool); ok { r0 = returnFunc() } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(spi.MessageCodec) - } + r0 = ret.Get(0).(bool) } return r0 } -// MockDefaultConnectionRequirements_GetMessageCodec_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetMessageCodec' -type MockDefaultConnectionRequirements_GetMessageCodec_Call struct { +// MockDefaultConnectionRequirements_IsConnected_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsConnected' +type MockDefaultConnectionRequirements_IsConnected_Call struct { *mock.Call } -// GetMessageCodec is a helper method to define mock.On call -func (_e *MockDefaultConnectionRequirements_Expecter) GetMessageCodec() *MockDefaultConnectionRequirements_GetMessageCodec_Call { - return &MockDefaultConnectionRequirements_GetMessageCodec_Call{Call: _e.mock.On("GetMessageCodec")} +// IsConnected is a helper method to define mock.On call +func (_e *MockDefaultConnectionRequirements_Expecter) IsConnected() *MockDefaultConnectionRequirements_IsConnected_Call { + return &MockDefaultConnectionRequirements_IsConnected_Call{Call: _e.mock.On("IsConnected")} } -func (_c *MockDefaultConnectionRequirements_GetMessageCodec_Call) Run(run func()) *MockDefaultConnectionRequirements_GetMessageCodec_Call { +func (_c *MockDefaultConnectionRequirements_IsConnected_Call) Run(run func()) *MockDefaultConnectionRequirements_IsConnected_Call { _c.Call.Run(func(args mock.Arguments) { run() }) return _c } -func (_c *MockDefaultConnectionRequirements_GetMessageCodec_Call) Return(messageCodec spi.MessageCodec) *MockDefaultConnectionRequirements_GetMessageCodec_Call { - _c.Call.Return(messageCodec) +func (_c *MockDefaultConnectionRequirements_IsConnected_Call) Return(b bool) *MockDefaultConnectionRequirements_IsConnected_Call { + _c.Call.Return(b) return _c } -func (_c *MockDefaultConnectionRequirements_GetMessageCodec_Call) RunAndReturn(run func() spi.MessageCodec) *MockDefaultConnectionRequirements_GetMessageCodec_Call { +func (_c *MockDefaultConnectionRequirements_IsConnected_Call) RunAndReturn(run func() bool) *MockDefaultConnectionRequirements_IsConnected_Call { _c.Call.Return(run) return _c } @@ -4900,6 +4898,46 @@ func (_m *MockExpectation) EXPECT() *MockExpectation_Expecter { return &MockExpectation_Expecter{mock: &_m.Mock} } +// Cancel provides a mock function for the type MockExpectation +func (_mock *MockExpectation) Cancel(cause error) { + _mock.Called(cause) + return +} + +// MockExpectation_Cancel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Cancel' +type MockExpectation_Cancel_Call struct { + *mock.Call +} + +// Cancel is a helper method to define mock.On call +// - cause error +func (_e *MockExpectation_Expecter) Cancel(cause interface{}) *MockExpectation_Cancel_Call { + return &MockExpectation_Cancel_Call{Call: _e.mock.On("Cancel", cause)} +} + +func (_c *MockExpectation_Cancel_Call) Run(run func(cause error)) *MockExpectation_Cancel_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 error + if args[0] != nil { + arg0 = args[0].(error) + } + run( + arg0, + ) + }) + return _c +} + +func (_c *MockExpectation_Cancel_Call) Return() *MockExpectation_Cancel_Call { + _c.Call.Return() + return _c +} + +func (_c *MockExpectation_Cancel_Call) RunAndReturn(run func(cause error)) *MockExpectation_Cancel_Call { + _c.Run(run) + return _c +} + // GetAcceptsMessage provides a mock function for the type MockExpectation func (_mock *MockExpectation) GetAcceptsMessage() spi.AcceptsMessage { ret := _mock.Called() diff --git a/plc4go/spi/mocks_test.go b/plc4go/spi/mocks_test.go index c7ed1abefc..56348c89bd 100644 --- a/plc4go/spi/mocks_test.go +++ b/plc4go/spi/mocks_test.go @@ -1010,6 +1010,46 @@ func (_m *MockExpectation) EXPECT() *MockExpectation_Expecter { return &MockExpectation_Expecter{mock: &_m.Mock} } +// Cancel provides a mock function for the type MockExpectation +func (_mock *MockExpectation) Cancel(cause error) { + _mock.Called(cause) + return +} + +// MockExpectation_Cancel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Cancel' +type MockExpectation_Cancel_Call struct { + *mock.Call +} + +// Cancel is a helper method to define mock.On call +// - cause error +func (_e *MockExpectation_Expecter) Cancel(cause interface{}) *MockExpectation_Cancel_Call { + return &MockExpectation_Cancel_Call{Call: _e.mock.On("Cancel", cause)} +} + +func (_c *MockExpectation_Cancel_Call) Run(run func(cause error)) *MockExpectation_Cancel_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 error + if args[0] != nil { + arg0 = args[0].(error) + } + run( + arg0, + ) + }) + return _c +} + +func (_c *MockExpectation_Cancel_Call) Return() *MockExpectation_Cancel_Call { + _c.Call.Return() + return _c +} + +func (_c *MockExpectation_Cancel_Call) RunAndReturn(run func(cause error)) *MockExpectation_Cancel_Call { + _c.Run(run) + return _c +} + // GetAcceptsMessage provides a mock function for the type MockExpectation func (_mock *MockExpectation) GetAcceptsMessage() AcceptsMessage { ret := _mock.Called() diff --git a/plc4go/spi/tracer/Tracer.go b/plc4go/spi/tracer/Tracer.go index 547b7e227a..b57745f0f8 100644 --- a/plc4go/spi/tracer/Tracer.go +++ b/plc4go/spi/tracer/Tracer.go @@ -108,7 +108,7 @@ func (t *tracer) AddTrace(operation string, message string) { func (t *tracer) AddTransactionalStartTrace(operation string, message string) string { t.m.Lock() defer t.m.Unlock() - transactionId := utils.GenerateId(t.log, 4) + transactionId := utils.GenerateId(4) t.traceEntries = append(t.traceEntries, TraceEntry{ Timestamp: time.Now(), ConnectionId: t.connectionId.Load().(string), diff --git a/plc4go/spi/utils/IdGenerator.go b/plc4go/spi/utils/IdGenerator.go index 86b0a01539..8c7f4a887e 100644 --- a/plc4go/spi/utils/IdGenerator.go +++ b/plc4go/spi/utils/IdGenerator.go @@ -20,17 +20,19 @@ package utils import ( + "crypto/rand" "encoding/hex" - "math/rand" - "github.com/rs/zerolog" + "github.com/google/uuid" ) var randomByteFiller = rand.Read -func GenerateId(localLog zerolog.Logger, numBytes int) string { +func GenerateId(numBytes int) string { transactionIdBytes := make([]byte, numBytes) - n, err := randomByteFiller(transactionIdBytes) - localLog.Trace().Err(err).Int("n", n).Msg("Read n bytes") + _, err := randomByteFiller(transactionIdBytes) + if err != nil { + return err.Error() + uuid.NewString() + } return hex.EncodeToString(transactionIdBytes) } diff --git a/plc4go/spi/utils/IdGenerator_test.go b/plc4go/spi/utils/IdGenerator_test.go index 1d1aeae9c8..987024b02e 100644 --- a/plc4go/spi/utils/IdGenerator_test.go +++ b/plc4go/spi/utils/IdGenerator_test.go @@ -49,7 +49,7 @@ func TestGenerateId(t *testing.T) { randomByteFiller = func(_ []byte) (n int, err error) { return 0, nil } - assert.Equalf(t, tt.want, GenerateId(produceTestLogger(t), tt.args.numBytes), "GenerateId(%v)", tt.args.numBytes) + assert.Equalf(t, tt.want, GenerateId(tt.args.numBytes), "GenerateId(%v)", tt.args.numBytes) }) } }
