This is an automated email from the ASF dual-hosted git repository. sruehl pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 430655fdf995364e800a7a5b5e938dd42cebcafb Author: Sebastian Rühl <sru...@apache.org> AuthorDate: Wed Jun 21 17:47:35 2023 +0200 refactor(plc4go/spi): move worker starting into a own method --- plc4go/spi/default/DefaultCodec.go | 20 ++++++----- plc4go/spi/default/DefaultCodec_test.go | 61 +++++++++++++++++++++++++-------- 2 files changed, 58 insertions(+), 23 deletions(-) diff --git a/plc4go/spi/default/DefaultCodec.go b/plc4go/spi/default/DefaultCodec.go index 209dff1cd8..6791b150d2 100644 --- a/plc4go/spi/default/DefaultCodec.go +++ b/plc4go/spi/default/DefaultCodec.go @@ -149,8 +149,7 @@ func (m *defaultCodec) ConnectWithContext(ctx context.Context) error { } m.log.Debug().Msg("Message codec currently not running, starting worker now") - m.activeWorker.Add(1) - go m.Work(m.DefaultCodecRequirements) + m.startWorker() m.running.Store(true) m.log.Trace().Msg("connected") return nil @@ -277,7 +276,13 @@ func (m *defaultCodec) HandleMessages(message spi.Message) bool { return messageHandled } -func (m *defaultCodec) Work(codec DefaultCodecRequirements) { +func (m *defaultCodec) startWorker() { + m.log.Trace().Msg("starting worker") + m.activeWorker.Add(1) + go m.Work() +} + +func (m *defaultCodec) Work() { defer m.activeWorker.Done() workerLog := m.log.With().Logger() if !m.traceDefaultMessageCodecWorker { @@ -286,19 +291,18 @@ func (m *defaultCodec) Work(codec DefaultCodecRequirements) { workerLog.Trace().Msg("Starting work") defer workerLog.Trace().Msg("work ended") - defer func(workerLog zerolog.Logger) { + defer func() { if err := recover(); err != nil { // TODO: If this is an error, cast it to an error and log it with "Err(err)" m.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack()) } if m.running.Load() { workerLog.Warn().Msg("Keep running") - m.activeWorker.Add(1) - go m.Work(codec) + m.startWorker() } else { workerLog.Info().Msg("Worker terminated") } - }(workerLog) + }() // Start an endless loop mainLoop: @@ -361,7 +365,7 @@ mainLoop: if m.customMessageHandling != nil { workerLog.Trace().Msg("Executing custom handling") start := time.Now() - handled := m.customMessageHandling(codec, message) + handled := m.customMessageHandling(m.DefaultCodecRequirements, message) workerLog.Trace().Msgf("custom handling took %s", time.Since(start)) if handled { workerLog.Trace().Msg("Custom handling handled the message") diff --git a/plc4go/spi/default/DefaultCodec_test.go b/plc4go/spi/default/DefaultCodec_test.go index c0d00b5ac0..ca46f49059 100644 --- a/plc4go/spi/default/DefaultCodec_test.go +++ b/plc4go/spi/default/DefaultCodec_test.go @@ -884,14 +884,10 @@ func Test_defaultCodec_Work(t *testing.T) { running bool customMessageHandling func(codec DefaultCodecRequirements, message spi.Message) bool } - type args struct { - codec DefaultCodecRequirements - } tests := []struct { name string fields fields - args args - mockSetup func(t *testing.T, fields *fields, args *args) + mockSetup func(t *testing.T, fields *fields) manipulator func(t *testing.T, codec *defaultCodec) }{ { @@ -900,7 +896,7 @@ func Test_defaultCodec_Work(t *testing.T) { codec.running.Store(true) codec.activeWorker.Add(1) }, - mockSetup: func(t *testing.T, fields *fields, args *args) { + mockSetup: func(t *testing.T, fields *fields) { requirements := NewMockDefaultCodecRequirements(t) fields.DefaultCodecRequirements = requirements }, @@ -941,7 +937,7 @@ func Test_defaultCodec_Work(t *testing.T) { }, }, }, - mockSetup: func(t *testing.T, fields *fields, args *args) { + mockSetup: func(t *testing.T, fields *fields) { requirements := NewMockDefaultCodecRequirements(t) requirements.EXPECT().Receive().Return(nil, errors.New("nope")) fields.DefaultCodecRequirements = requirements @@ -987,7 +983,7 @@ func Test_defaultCodec_Work(t *testing.T) { }, }, }, - mockSetup: func(t *testing.T, fields *fields, args *args) { + mockSetup: func(t *testing.T, fields *fields) { requirements := NewMockDefaultCodecRequirements(t) requirements.EXPECT().Receive().Return(nil, nil) fields.DefaultCodecRequirements = requirements @@ -1033,7 +1029,7 @@ func Test_defaultCodec_Work(t *testing.T) { }, }, }, - mockSetup: func(t *testing.T, fields *fields, args *args) { + mockSetup: func(t *testing.T, fields *fields) { requirements := NewMockDefaultCodecRequirements(t) requirements.EXPECT().Receive().Return(NewMockMessage(t), nil) fields.DefaultCodecRequirements = requirements @@ -1057,7 +1053,7 @@ func Test_defaultCodec_Work(t *testing.T) { }, }, }, - mockSetup: func(t *testing.T, fields *fields, args *args) { + mockSetup: func(t *testing.T, fields *fields) { requirements := NewMockDefaultCodecRequirements(t) requirements.EXPECT().Receive().Return(NewMockMessage(t), nil) fields.DefaultCodecRequirements = requirements @@ -1103,7 +1099,7 @@ func Test_defaultCodec_Work(t *testing.T) { }, }, }, - mockSetup: func(t *testing.T, fields *fields, args *args) { + mockSetup: func(t *testing.T, fields *fields) { requirements := NewMockDefaultCodecRequirements(t) requirements.EXPECT().Receive().Return(nil, errors.New("nope")) fields.DefaultCodecRequirements = requirements @@ -1152,7 +1148,7 @@ func Test_defaultCodec_Work(t *testing.T) { }, }, }, - mockSetup: func(t *testing.T, fields *fields, args *args) { + mockSetup: func(t *testing.T, fields *fields) { requirements := NewMockDefaultCodecRequirements(t) requirements.EXPECT().Receive().Return(NewMockMessage(t), nil) fields.DefaultCodecRequirements = requirements @@ -1201,7 +1197,7 @@ func Test_defaultCodec_Work(t *testing.T) { }, }, }, - mockSetup: func(t *testing.T, fields *fields, args *args) { + mockSetup: func(t *testing.T, fields *fields) { requirements := NewMockDefaultCodecRequirements(t) requirements.EXPECT().Receive().Return(NewMockMessage(t), nil) fields.DefaultCodecRequirements = requirements @@ -1215,7 +1211,7 @@ func Test_defaultCodec_Work(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { if tt.mockSetup != nil { - tt.mockSetup(t, &tt.fields, &tt.args) + tt.mockSetup(t, &tt.fields) } m := &defaultCodec{ DefaultCodecRequirements: tt.fields.DefaultCodecRequirements, @@ -1232,7 +1228,7 @@ func Test_defaultCodec_Work(t *testing.T) { time.Sleep(200 * time.Millisecond) m.running.Store(false) }() - m.Work(tt.args.codec) + m.Work() }) } } @@ -1300,3 +1296,38 @@ func Test_defaultCodec_String(t *testing.T) { }) } } + +func Test_defaultCodec_startWorker(t *testing.T) { + type fields struct { + DefaultCodecRequirements DefaultCodecRequirements + transportInstance transports.TransportInstance + expectations []spi.Expectation + defaultIncomingMessageChannel chan spi.Message + customMessageHandling func(codec DefaultCodecRequirements, message spi.Message) bool + receiveTimeout time.Duration + traceDefaultMessageCodecWorker bool + } + tests := []struct { + name string + fields fields + }{ + { + name: "start it not running", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := &defaultCodec{ + DefaultCodecRequirements: tt.fields.DefaultCodecRequirements, + transportInstance: tt.fields.transportInstance, + expectations: tt.fields.expectations, + defaultIncomingMessageChannel: tt.fields.defaultIncomingMessageChannel, + customMessageHandling: tt.fields.customMessageHandling, + receiveTimeout: tt.fields.receiveTimeout, + traceDefaultMessageCodecWorker: tt.fields.traceDefaultMessageCodecWorker, + log: testutils.ProduceTestingLogger(t), + } + m.startWorker() + }) + } +}