This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 430655fdf995364e800a7a5b5e938dd42cebcafb
Author: Sebastian Rühl <sru...@apache.org>
AuthorDate: Wed Jun 21 17:47:35 2023 +0200

    refactor(plc4go/spi): move worker starting into a own method
---
 plc4go/spi/default/DefaultCodec.go      | 20 ++++++-----
 plc4go/spi/default/DefaultCodec_test.go | 61 +++++++++++++++++++++++++--------
 2 files changed, 58 insertions(+), 23 deletions(-)

diff --git a/plc4go/spi/default/DefaultCodec.go 
b/plc4go/spi/default/DefaultCodec.go
index 209dff1cd8..6791b150d2 100644
--- a/plc4go/spi/default/DefaultCodec.go
+++ b/plc4go/spi/default/DefaultCodec.go
@@ -149,8 +149,7 @@ func (m *defaultCodec) ConnectWithContext(ctx 
context.Context) error {
        }
 
        m.log.Debug().Msg("Message codec currently not running, starting worker 
now")
-       m.activeWorker.Add(1)
-       go m.Work(m.DefaultCodecRequirements)
+       m.startWorker()
        m.running.Store(true)
        m.log.Trace().Msg("connected")
        return nil
@@ -277,7 +276,13 @@ func (m *defaultCodec) HandleMessages(message spi.Message) 
bool {
        return messageHandled
 }
 
-func (m *defaultCodec) Work(codec DefaultCodecRequirements) {
+func (m *defaultCodec) startWorker() {
+       m.log.Trace().Msg("starting worker")
+       m.activeWorker.Add(1)
+       go m.Work()
+}
+
+func (m *defaultCodec) Work() {
        defer m.activeWorker.Done()
        workerLog := m.log.With().Logger()
        if !m.traceDefaultMessageCodecWorker {
@@ -286,19 +291,18 @@ func (m *defaultCodec) Work(codec 
DefaultCodecRequirements) {
        workerLog.Trace().Msg("Starting work")
        defer workerLog.Trace().Msg("work ended")
 
-       defer func(workerLog zerolog.Logger) {
+       defer func() {
                if err := recover(); err != nil {
                        // TODO: If this is an error, cast it to an error and 
log it with "Err(err)"
                        m.log.Error().Msgf("panic-ed %v. Stack: %s", err, 
debug.Stack())
                }
                if m.running.Load() {
                        workerLog.Warn().Msg("Keep running")
-                       m.activeWorker.Add(1)
-                       go m.Work(codec)
+                       m.startWorker()
                } else {
                        workerLog.Info().Msg("Worker terminated")
                }
-       }(workerLog)
+       }()
 
        // Start an endless loop
 mainLoop:
@@ -361,7 +365,7 @@ mainLoop:
                if m.customMessageHandling != nil {
                        workerLog.Trace().Msg("Executing custom handling")
                        start := time.Now()
-                       handled := m.customMessageHandling(codec, message)
+                       handled := 
m.customMessageHandling(m.DefaultCodecRequirements, message)
                        workerLog.Trace().Msgf("custom handling took %s", 
time.Since(start))
                        if handled {
                                workerLog.Trace().Msg("Custom handling handled 
the message")
diff --git a/plc4go/spi/default/DefaultCodec_test.go 
b/plc4go/spi/default/DefaultCodec_test.go
index c0d00b5ac0..ca46f49059 100644
--- a/plc4go/spi/default/DefaultCodec_test.go
+++ b/plc4go/spi/default/DefaultCodec_test.go
@@ -884,14 +884,10 @@ func Test_defaultCodec_Work(t *testing.T) {
                running                       bool
                customMessageHandling         func(codec 
DefaultCodecRequirements, message spi.Message) bool
        }
-       type args struct {
-               codec DefaultCodecRequirements
-       }
        tests := []struct {
                name        string
                fields      fields
-               args        args
-               mockSetup   func(t *testing.T, fields *fields, args *args)
+               mockSetup   func(t *testing.T, fields *fields)
                manipulator func(t *testing.T, codec *defaultCodec)
        }{
                {
@@ -900,7 +896,7 @@ func Test_defaultCodec_Work(t *testing.T) {
                                codec.running.Store(true)
                                codec.activeWorker.Add(1)
                        },
-                       mockSetup: func(t *testing.T, fields *fields, args 
*args) {
+                       mockSetup: func(t *testing.T, fields *fields) {
                                requirements := 
NewMockDefaultCodecRequirements(t)
                                fields.DefaultCodecRequirements = requirements
                        },
@@ -941,7 +937,7 @@ func Test_defaultCodec_Work(t *testing.T) {
                                        },
                                },
                        },
-                       mockSetup: func(t *testing.T, fields *fields, args 
*args) {
+                       mockSetup: func(t *testing.T, fields *fields) {
                                requirements := 
NewMockDefaultCodecRequirements(t)
                                requirements.EXPECT().Receive().Return(nil, 
errors.New("nope"))
                                fields.DefaultCodecRequirements = requirements
@@ -987,7 +983,7 @@ func Test_defaultCodec_Work(t *testing.T) {
                                        },
                                },
                        },
-                       mockSetup: func(t *testing.T, fields *fields, args 
*args) {
+                       mockSetup: func(t *testing.T, fields *fields) {
                                requirements := 
NewMockDefaultCodecRequirements(t)
                                requirements.EXPECT().Receive().Return(nil, nil)
                                fields.DefaultCodecRequirements = requirements
@@ -1033,7 +1029,7 @@ func Test_defaultCodec_Work(t *testing.T) {
                                        },
                                },
                        },
-                       mockSetup: func(t *testing.T, fields *fields, args 
*args) {
+                       mockSetup: func(t *testing.T, fields *fields) {
                                requirements := 
NewMockDefaultCodecRequirements(t)
                                
requirements.EXPECT().Receive().Return(NewMockMessage(t), nil)
                                fields.DefaultCodecRequirements = requirements
@@ -1057,7 +1053,7 @@ func Test_defaultCodec_Work(t *testing.T) {
                                        },
                                },
                        },
-                       mockSetup: func(t *testing.T, fields *fields, args 
*args) {
+                       mockSetup: func(t *testing.T, fields *fields) {
                                requirements := 
NewMockDefaultCodecRequirements(t)
                                
requirements.EXPECT().Receive().Return(NewMockMessage(t), nil)
                                fields.DefaultCodecRequirements = requirements
@@ -1103,7 +1099,7 @@ func Test_defaultCodec_Work(t *testing.T) {
                                        },
                                },
                        },
-                       mockSetup: func(t *testing.T, fields *fields, args 
*args) {
+                       mockSetup: func(t *testing.T, fields *fields) {
                                requirements := 
NewMockDefaultCodecRequirements(t)
                                requirements.EXPECT().Receive().Return(nil, 
errors.New("nope"))
                                fields.DefaultCodecRequirements = requirements
@@ -1152,7 +1148,7 @@ func Test_defaultCodec_Work(t *testing.T) {
                                        },
                                },
                        },
-                       mockSetup: func(t *testing.T, fields *fields, args 
*args) {
+                       mockSetup: func(t *testing.T, fields *fields) {
                                requirements := 
NewMockDefaultCodecRequirements(t)
                                
requirements.EXPECT().Receive().Return(NewMockMessage(t), nil)
                                fields.DefaultCodecRequirements = requirements
@@ -1201,7 +1197,7 @@ func Test_defaultCodec_Work(t *testing.T) {
                                        },
                                },
                        },
-                       mockSetup: func(t *testing.T, fields *fields, args 
*args) {
+                       mockSetup: func(t *testing.T, fields *fields) {
                                requirements := 
NewMockDefaultCodecRequirements(t)
                                
requirements.EXPECT().Receive().Return(NewMockMessage(t), nil)
                                fields.DefaultCodecRequirements = requirements
@@ -1215,7 +1211,7 @@ func Test_defaultCodec_Work(t *testing.T) {
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
                        if tt.mockSetup != nil {
-                               tt.mockSetup(t, &tt.fields, &tt.args)
+                               tt.mockSetup(t, &tt.fields)
                        }
                        m := &defaultCodec{
                                DefaultCodecRequirements:      
tt.fields.DefaultCodecRequirements,
@@ -1232,7 +1228,7 @@ func Test_defaultCodec_Work(t *testing.T) {
                                time.Sleep(200 * time.Millisecond)
                                m.running.Store(false)
                        }()
-                       m.Work(tt.args.codec)
+                       m.Work()
                })
        }
 }
@@ -1300,3 +1296,38 @@ func Test_defaultCodec_String(t *testing.T) {
                })
        }
 }
+
+func Test_defaultCodec_startWorker(t *testing.T) {
+       type fields struct {
+               DefaultCodecRequirements       DefaultCodecRequirements
+               transportInstance              transports.TransportInstance
+               expectations                   []spi.Expectation
+               defaultIncomingMessageChannel  chan spi.Message
+               customMessageHandling          func(codec 
DefaultCodecRequirements, message spi.Message) bool
+               receiveTimeout                 time.Duration
+               traceDefaultMessageCodecWorker bool
+       }
+       tests := []struct {
+               name   string
+               fields fields
+       }{
+               {
+                       name: "start it not running",
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       m := &defaultCodec{
+                               DefaultCodecRequirements:       
tt.fields.DefaultCodecRequirements,
+                               transportInstance:              
tt.fields.transportInstance,
+                               expectations:                   
tt.fields.expectations,
+                               defaultIncomingMessageChannel:  
tt.fields.defaultIncomingMessageChannel,
+                               customMessageHandling:          
tt.fields.customMessageHandling,
+                               receiveTimeout:                 
tt.fields.receiveTimeout,
+                               traceDefaultMessageCodecWorker: 
tt.fields.traceDefaultMessageCodecWorker,
+                               log:                            
testutils.ProduceTestingLogger(t),
+                       }
+                       m.startWorker()
+               })
+       }
+}

Reply via email to