This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 474d17901b test(plc4go/cbus): cleanup resources
474d17901b is described below

commit 474d17901b04f164d9300cc4687ad642085b6bfa
Author: Sebastian Rühl <[email protected]>
AuthorDate: Wed May 31 16:33:02 2023 +0200

    test(plc4go/cbus): cleanup resources
---
 plc4go/internal/cbus/Connection_test.go | 370 +++++++++++++++++++++++---------
 plc4go/internal/cbus/Discoverer.go      |   6 +
 plc4go/internal/cbus/Subscriber_test.go |  34 ++-
 plc4go/spi/default/DefaultConnection.go |  11 +-
 4 files changed, 313 insertions(+), 108 deletions(-)

diff --git a/plc4go/internal/cbus/Connection_test.go 
b/plc4go/internal/cbus/Connection_test.go
index 341a2a275c..80d0e04a71 100644
--- a/plc4go/internal/cbus/Connection_test.go
+++ b/plc4go/internal/cbus/Connection_test.go
@@ -31,6 +31,7 @@ import (
        "github.com/apache/plc4x/plc4go/spi/transactions"
        "github.com/apache/plc4x/plc4go/spi/transports"
        "github.com/apache/plc4x/plc4go/spi/transports/test"
+       "github.com/apache/plc4x/plc4go/spi/utils"
        "github.com/rs/zerolog"
        "github.com/stretchr/testify/assert"
        "net/url"
@@ -185,7 +186,7 @@ func TestConnection_ConnectWithContext(t *testing.T) {
 
                                // Build the default connection
                                fields.DefaultConnection = 
_default.NewDefaultConnection(nil, loggerOption)
-                               fields.messageCodec = NewMessageCodec(func() 
transports.TransportInstance {
+                               codec := NewMessageCodec(func() 
transports.TransportInstance {
                                        transport := 
test.NewTransport(loggerOption)
                                        ti, err := 
transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
                                        if err != nil {
@@ -194,6 +195,10 @@ func TestConnection_ConnectWithContext(t *testing.T) {
                                        }
                                        return ti
                                }(), loggerOption)
+                               t.Cleanup(func() {
+                                       assert.NoError(t, codec.Disconnect())
+                               })
+                               fields.messageCodec = codec
                        },
                        wantAsserter: func(t *testing.T, results <-chan 
plc4go.PlcConnectionConnectResult) bool {
                                assert.NotNil(t, results)
@@ -829,13 +834,22 @@ func TestConnection_fireConnectionError(t *testing.T) {
                name          string
                fields        fields
                args          args
+               setup         func(t *testing.T, fields *fields, args *args)
                chanValidator func(*testing.T, chan<- 
plc4go.PlcConnectionConnectResult) bool
        }{
                {
                        name: "instant connect",
-                       fields: fields{
-                               DefaultConnection: 
_default.NewDefaultConnection(nil),
-                               messageCodec: NewMessageCodec(func() 
transports.TransportInstance {
+                       setup: func(t *testing.T, fields *fields, args *args) {
+                               // Setup logger
+                               logger := testutils.ProduceTestingLogger(t)
+
+                               loggerOption := options.WithCustomLogger(logger)
+
+                               // Set the model logger to the logger above
+                               testutils.SetToTestingLogger(t, 
readWriteModel.Plc4xModelLog)
+
+                               fields.DefaultConnection = 
_default.NewDefaultConnection(nil, loggerOption)
+                               codec := NewMessageCodec(func() 
transports.TransportInstance {
                                        transport := test.NewTransport()
                                        ti, err := 
transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
                                        if err != nil {
@@ -843,7 +857,11 @@ func TestConnection_fireConnectionError(t *testing.T) {
                                                t.FailNow()
                                        }
                                        return ti
-                               }()),
+                               }(), loggerOption)
+                               t.Cleanup(func() {
+                                       assert.NoError(t, codec.Disconnect())
+                               })
+                               fields.messageCodec = codec
                        },
                        chanValidator: func(_ *testing.T, _ chan<- 
plc4go.PlcConnectionConnectResult) bool {
                                return true
@@ -852,8 +870,21 @@ func TestConnection_fireConnectionError(t *testing.T) {
                {
                        name: "notified connect",
                        fields: fields{
-                               DefaultConnection: 
_default.NewDefaultConnection(nil),
-                               messageCodec: NewMessageCodec(func() 
transports.TransportInstance {
+                               driverContext: DriverContext{
+                                       awaitSetupComplete: true,
+                               },
+                       },
+                       setup: func(t *testing.T, fields *fields, args *args) {
+                               // Setup logger
+                               logger := testutils.ProduceTestingLogger(t)
+
+                               loggerOption := options.WithCustomLogger(logger)
+
+                               // Set the model logger to the logger above
+                               testutils.SetToTestingLogger(t, 
readWriteModel.Plc4xModelLog)
+
+                               fields.DefaultConnection = 
_default.NewDefaultConnection(nil, loggerOption)
+                               codec := NewMessageCodec(func() 
transports.TransportInstance {
                                        transport := test.NewTransport()
                                        ti, err := 
transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
                                        if err != nil {
@@ -861,10 +892,11 @@ func TestConnection_fireConnectionError(t *testing.T) {
                                                t.FailNow()
                                        }
                                        return ti
-                               }()),
-                               driverContext: DriverContext{
-                                       awaitSetupComplete: true,
-                               },
+                               }(), loggerOption)
+                               t.Cleanup(func() {
+                                       assert.NoError(t, codec.Disconnect())
+                               })
+                               fields.messageCodec = codec
                        },
                        args: args{ch: make(chan<- 
plc4go.PlcConnectionConnectResult, 1)},
                        chanValidator: func(t *testing.T, results chan<- 
plc4go.PlcConnectionConnectResult) bool {
@@ -875,6 +907,9 @@ func TestConnection_fireConnectionError(t *testing.T) {
        }
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
+                       if tt.setup != nil {
+                               tt.setup(t, &tt.fields, &tt.args)
+                       }
                        c := &Connection{
                                DefaultConnection: tt.fields.DefaultConnection,
                                messageCodec:      tt.fields.messageCodec,
@@ -946,7 +981,7 @@ func TestConnection_sendCalDataWrite(t *testing.T) {
                                loggerOption := options.WithCustomLogger(logger)
 
                                fields.DefaultConnection = 
_default.NewDefaultConnection(nil, loggerOption)
-                               fields.messageCodec = NewMessageCodec(func() 
transports.TransportInstance {
+                               codec := NewMessageCodec(func() 
transports.TransportInstance {
                                        transport := 
test.NewTransport(loggerOption)
                                        ti, err := 
transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
                                        if err != nil {
@@ -955,6 +990,10 @@ func TestConnection_sendCalDataWrite(t *testing.T) {
                                        }
                                        return ti
                                }())
+                               t.Cleanup(func() {
+                                       assert.NoError(t, codec.Disconnect())
+                               })
+                               fields.messageCodec = codec
                        },
                        want: false,
                },
@@ -1003,22 +1042,11 @@ func TestConnection_sendReset(t *testing.T) {
                name   string
                fields fields
                args   args
+               setup  func(t *testing.T, fields *fields, args *args)
                wantOk bool
        }{
                {
                        name: "send reset",
-                       fields: fields{
-                               DefaultConnection: 
_default.NewDefaultConnection(nil),
-                               messageCodec: NewMessageCodec(func() 
transports.TransportInstance {
-                                       transport := test.NewTransport()
-                                       ti, err := 
transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
-                                       if err != nil {
-                                               t.Error(err)
-                                               t.FailNow()
-                                       }
-                                       return ti
-                               }()),
-                       },
                        args: args{
                                ctx: context.Background(),
                                ch:  make(chan 
plc4go.PlcConnectionConnectResult, 1),
@@ -1032,11 +1060,38 @@ func TestConnection_sendReset(t *testing.T) {
                                }(),
                                sendOutErrorNotification: false,
                        },
+                       setup: func(t *testing.T, fields *fields, args *args) {
+                               // Setup logger
+                               logger := testutils.ProduceTestingLogger(t)
+
+                               loggerOption := options.WithCustomLogger(logger)
+
+                               // Set the model logger to the logger above
+                               testutils.SetToTestingLogger(t, 
readWriteModel.Plc4xModelLog)
+
+                               fields.DefaultConnection = 
_default.NewDefaultConnection(nil, loggerOption)
+                               codec := NewMessageCodec(func() 
transports.TransportInstance {
+                                       transport := test.NewTransport()
+                                       ti, err := 
transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
+                                       if err != nil {
+                                               t.Error(err)
+                                               t.FailNow()
+                                       }
+                                       return ti
+                               }(), loggerOption)
+                               t.Cleanup(func() {
+                                       assert.NoError(t, codec.Disconnect())
+                               })
+                               fields.messageCodec = codec
+                       },
                        wantOk: false,
                },
        }
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
+                       if tt.setup != nil {
+                               tt.setup(t, &tt.fields, &tt.args)
+                       }
                        c := &Connection{
                                DefaultConnection: tt.fields.DefaultConnection,
                                messageCodec:      tt.fields.messageCodec,
@@ -1104,7 +1159,7 @@ func TestConnection_setApplicationFilter(t *testing.T) {
 
                                // Setup connection
                                fields.DefaultConnection = 
_default.NewDefaultConnection(nil, loggerOption)
-                               fields.messageCodec = NewMessageCodec(func() 
transports.TransportInstance {
+                               codec := NewMessageCodec(func() 
transports.TransportInstance {
                                        transport := 
test.NewTransport(loggerOption)
                                        ti, err := 
transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
                                        if err != nil {
@@ -1113,6 +1168,10 @@ func TestConnection_setApplicationFilter(t *testing.T) {
                                        }
                                        return ti
                                }(), loggerOption)
+                               t.Cleanup(func() {
+                                       assert.NoError(t, codec.Disconnect())
+                               })
+                               fields.messageCodec = codec
                        },
                        wantOk: false,
                },
@@ -1165,18 +1224,6 @@ func TestConnection_setInterface1PowerUpSettings(t 
*testing.T) {
        }{
                {
                        name: "set interface 1 PUN options (failing)",
-                       fields: fields{
-                               DefaultConnection: 
_default.NewDefaultConnection(nil),
-                               messageCodec: NewMessageCodec(func() 
transports.TransportInstance {
-                                       transport := test.NewTransport()
-                                       ti, err := 
transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
-                                       if err != nil {
-                                               t.Error(err)
-                                               t.FailNow()
-                                       }
-                                       return ti
-                               }()),
-                       },
                        args: args{
                                ctx: context.Background(),
                                ch:  make(chan 
plc4go.PlcConnectionConnectResult, 1),
@@ -1190,7 +1237,30 @@ func TestConnection_setInterface1PowerUpSettings(t 
*testing.T) {
                                }(),
                        },
                        setup: func(t *testing.T, fields *fields, args *args) {
+                               // Setup logger
+                               logger := testutils.ProduceTestingLogger(t)
+                               fields.log = logger
+
                                testutils.SetToTestingLogger(t, 
readWriteModel.Plc4xModelLog)
+
+                               // Custom option for that
+                               loggerOption := options.WithCustomLogger(logger)
+
+                               // Setup connection
+                               fields.DefaultConnection = 
_default.NewDefaultConnection(nil, loggerOption)
+                               codec := NewMessageCodec(func() 
transports.TransportInstance {
+                                       transport := 
test.NewTransport(loggerOption)
+                                       ti, err := 
transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+                                       if err != nil {
+                                               t.Error(err)
+                                               t.FailNow()
+                                       }
+                                       return ti
+                               }(), loggerOption)
+                               t.Cleanup(func() {
+                                       assert.NoError(t, codec.Disconnect())
+                               })
+                               fields.messageCodec = codec
                        },
                        wantOk: false,
                },
@@ -1243,18 +1313,6 @@ func TestConnection_setInterfaceOptions1(t *testing.T) {
        }{
                {
                        name: "set interface 1 options (failing)",
-                       fields: fields{
-                               DefaultConnection: 
_default.NewDefaultConnection(nil),
-                               messageCodec: NewMessageCodec(func() 
transports.TransportInstance {
-                                       transport := test.NewTransport()
-                                       ti, err := 
transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
-                                       if err != nil {
-                                               t.Error(err)
-                                               t.FailNow()
-                                       }
-                                       return ti
-                               }()),
-                       },
                        args: args{
                                ctx: context.Background(),
                                ch:  make(chan 
plc4go.PlcConnectionConnectResult, 1),
@@ -1268,7 +1326,30 @@ func TestConnection_setInterfaceOptions1(t *testing.T) {
                                }(),
                        },
                        setup: func(t *testing.T, fields *fields) {
+                               // Setup logger
+                               logger := testutils.ProduceTestingLogger(t)
+                               fields.log = logger
+
                                testutils.SetToTestingLogger(t, 
readWriteModel.Plc4xModelLog)
+
+                               // Custom option for that
+                               loggerOption := options.WithCustomLogger(logger)
+
+                               // Setup connection
+                               fields.DefaultConnection = 
_default.NewDefaultConnection(nil, loggerOption)
+                               codec := NewMessageCodec(func() 
transports.TransportInstance {
+                                       transport := 
test.NewTransport(loggerOption)
+                                       ti, err := 
transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+                                       if err != nil {
+                                               t.Error(err)
+                                               t.FailNow()
+                                       }
+                                       return ti
+                               }(), loggerOption)
+                               t.Cleanup(func() {
+                                       assert.NoError(t, codec.Disconnect())
+                               })
+                               fields.messageCodec = codec
                        },
                        want: false,
                },
@@ -1321,18 +1402,6 @@ func TestConnection_setInterfaceOptions3(t *testing.T) {
        }{
                {
                        name: "set interface 3 options (failing)",
-                       fields: fields{
-                               DefaultConnection: 
_default.NewDefaultConnection(nil),
-                               messageCodec: NewMessageCodec(func() 
transports.TransportInstance {
-                                       transport := test.NewTransport()
-                                       ti, err := 
transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
-                                       if err != nil {
-                                               t.Error(err)
-                                               t.FailNow()
-                                       }
-                                       return ti
-                               }()),
-                       },
                        args: args{
                                ctx: context.Background(),
                                ch:  make(chan 
plc4go.PlcConnectionConnectResult, 1),
@@ -1346,7 +1415,30 @@ func TestConnection_setInterfaceOptions3(t *testing.T) {
                                }(),
                        },
                        setup: func(t *testing.T, fields *fields) {
+                               // Setup logger
+                               logger := testutils.ProduceTestingLogger(t)
+                               fields.log = logger
+
                                testutils.SetToTestingLogger(t, 
readWriteModel.Plc4xModelLog)
+
+                               // Custom option for that
+                               loggerOption := options.WithCustomLogger(logger)
+
+                               // Setup connection
+                               fields.DefaultConnection = 
_default.NewDefaultConnection(nil, loggerOption)
+                               codec := NewMessageCodec(func() 
transports.TransportInstance {
+                                       transport := 
test.NewTransport(loggerOption)
+                                       ti, err := 
transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+                                       if err != nil {
+                                               t.Error(err)
+                                               t.FailNow()
+                                       }
+                                       return ti
+                               }(), loggerOption)
+                               t.Cleanup(func() {
+                                       assert.NoError(t, codec.Disconnect())
+                               })
+                               fields.messageCodec = codec
                        },
                        wantOk: false,
                },
@@ -1410,19 +1502,21 @@ func TestConnection_setupConnection(t *testing.T) {
                                // Custom option for that
                                loggerOption := options.WithCustomLogger(logger)
 
-                               // Build the default connection
+                               // Setup connection
                                fields.DefaultConnection = 
_default.NewDefaultConnection(nil, loggerOption)
-
-                               // Build the message codec
-                               fields.messageCodec = NewMessageCodec(func() 
transports.TransportInstance {
-                                       transport := test.NewTransport()
-                                       ti, err := 
transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
+                               codec := NewMessageCodec(func() 
transports.TransportInstance {
+                                       transport := 
test.NewTransport(loggerOption)
+                                       ti, err := 
transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
                                        if err != nil {
                                                t.Error(err)
                                                t.FailNow()
                                        }
                                        return ti
                                }(), loggerOption)
+                               t.Cleanup(func() {
+                                       assert.NoError(t, codec.Disconnect())
+                               })
+                               fields.messageCodec = codec
                        },
                },
                {
@@ -1475,6 +1569,9 @@ func TestConnection_setupConnection(t *testing.T) {
                                        t.Error(err)
                                        t.FailNow()
                                }
+                               t.Cleanup(func() {
+                                       assert.NoError(t, codec.Disconnect())
+                               })
 
                                fields.messageCodec = codec
                        },
@@ -1541,6 +1638,9 @@ func TestConnection_setupConnection(t *testing.T) {
                                        t.Error(err)
                                        t.FailNow()
                                }
+                               t.Cleanup(func() {
+                                       assert.NoError(t, codec.Disconnect())
+                               })
 
                                fields.messageCodec = codec
                        },
@@ -1613,6 +1713,9 @@ func TestConnection_setupConnection(t *testing.T) {
                                        t.Error(err)
                                        t.FailNow()
                                }
+                               t.Cleanup(func() {
+                                       assert.NoError(t, codec.Disconnect())
+                               })
 
                                fields.messageCodec = codec
                        },
@@ -1691,6 +1794,10 @@ func TestConnection_setupConnection(t *testing.T) {
                                        t.Error(err)
                                        t.FailNow()
                                }
+                               t.Cleanup(func() {
+                                       assert.NoError(t, codec.Disconnect())
+                               })
+
                                fields.messageCodec = codec
                        },
                },
@@ -1755,12 +1862,15 @@ func TestConnection_setupConnection(t *testing.T) {
                                                }
                                        })
                                        codec := 
NewMessageCodec(transportInstance)
-                                       err = codec.Connect()
-                                       if err != nil {
+                                       if err = codec.Connect(); err != nil {
                                                t.Error(err)
                                                t.FailNow()
                                                return nil
                                        }
+                                       t.Cleanup(func() {
+                                               assert.NoError(t, 
codec.Disconnect())
+                                       })
+
                                        return codec
                                }(),
                        },
@@ -1782,7 +1892,7 @@ func TestConnection_setupConnection(t *testing.T) {
                                fields.DefaultConnection = 
_default.NewDefaultConnection(nil, loggerOption)
 
                                // Build the message codec
-                               fields.messageCodec = NewMessageCodec(func() 
transports.TransportInstance {
+                               codec := NewMessageCodec(func() 
transports.TransportInstance {
                                        transport := test.NewTransport()
                                        ti, err := 
transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
                                        if err != nil {
@@ -1791,6 +1901,10 @@ func TestConnection_setupConnection(t *testing.T) {
                                        }
                                        return ti
                                }(), loggerOption)
+                               t.Cleanup(func() {
+                                       assert.NoError(t, codec.Disconnect())
+                               })
+                               fields.messageCodec = codec
                        },
                },
        }
@@ -1834,47 +1948,69 @@ func TestConnection_startSubscriptionHandler(t 
*testing.T) {
        }{
                {
                        name: "just start",
-                       fields: fields{
-                               DefaultConnection: 
_default.NewDefaultConnection(nil),
+                       setup: func(t *testing.T, fields *fields) {
+                               // Setup logger
+                               logger := testutils.ProduceTestingLogger(t)
+                               fields.log = logger
+                               loggerOption := options.WithCustomLogger(logger)
+
+                               // Set the model logger to the logger above
+                               testutils.SetToTestingLogger(t, 
readWriteModel.Plc4xModelLog)
+
+                               fields.DefaultConnection = 
_default.NewDefaultConnection(nil, loggerOption)
                        },
                },
                {
                        name: "just start and feed (no subs)",
-                       fields: fields{
-                               DefaultConnection: func() 
_default.DefaultConnection {
-                                       defaultConnection := 
_default.NewDefaultConnection(nil)
-                                       defaultConnection.SetConnected(true)
-                                       return defaultConnection
-                               }(),
-                               messageCodec: func() *MessageCodec {
-                                       messageCodec := NewMessageCodec(nil)
-                                       go func() {
-                                               messageCodec.monitoredMMIs <- 
nil
-                                               messageCodec.monitoredSALs <- 
nil
-                                       }()
-                                       return messageCodec
-                               }(),
+                       setup: func(t *testing.T, fields *fields) {
+                               // Setup logger
+                               logger := testutils.ProduceTestingLogger(t)
+                               fields.log = logger
+                               loggerOption := options.WithCustomLogger(logger)
+
+                               // Set the model logger to the logger above
+                               testutils.SetToTestingLogger(t, 
readWriteModel.Plc4xModelLog)
+
+                               defaultConnection := 
_default.NewDefaultConnection(nil, loggerOption)
+                               defaultConnection.SetConnected(true)
+                               fields.DefaultConnection = defaultConnection
+
+                               codec := NewMessageCodec(nil, loggerOption)
+                               go func() {
+                                       codec.monitoredMMIs <- nil
+                                       codec.monitoredSALs <- nil
+                               }()
+                               t.Cleanup(func() {
+                                       assert.NoError(t, codec.Disconnect())
+                               })
+                               fields.messageCodec = codec
                        },
                },
                {
                        name: "just start and feed",
-                       fields: fields{
-                               DefaultConnection: func() 
_default.DefaultConnection {
-                                       defaultConnection := 
_default.NewDefaultConnection(nil)
-                                       defaultConnection.SetConnected(true)
-                                       return defaultConnection
-                               }(),
-                               messageCodec: func() *MessageCodec {
-                                       messageCodec := NewMessageCodec(nil)
-                                       go func() {
-                                               messageCodec.monitoredMMIs <- 
readWriteModel.NewCALReplyShort(0, nil, nil, nil)
-                                               messageCodec.monitoredSALs <- 
readWriteModel.NewMonitoredSAL(0, nil)
-                                       }()
-                                       return messageCodec
-                               }(),
-                       },
                        setup: func(t *testing.T, fields *fields) {
+                               // Setup logger
+                               logger := testutils.ProduceTestingLogger(t)
+                               fields.log = logger
+                               loggerOption := options.WithCustomLogger(logger)
+
+                               // Set the model logger to the logger above
+                               testutils.SetToTestingLogger(t, 
readWriteModel.Plc4xModelLog)
+
+                               defaultConnection := 
_default.NewDefaultConnection(nil, loggerOption)
+                               defaultConnection.SetConnected(true)
+                               fields.DefaultConnection = defaultConnection
+
                                fields.subscribers = 
[]*Subscriber{NewSubscriber(nil, 
options.WithCustomLogger(testutils.ProduceTestingLogger(t)))}
+                               codec := NewMessageCodec(nil, loggerOption)
+                               go func() {
+                                       codec.monitoredMMIs <- 
readWriteModel.NewCALReplyShort(0, nil, nil, nil)
+                                       codec.monitoredSALs <- 
readWriteModel.NewMonitoredSAL(0, nil)
+                               }()
+                               t.Cleanup(func() {
+                                       assert.NoError(t, codec.Disconnect())
+                               })
+                               fields.messageCodec = codec
                        },
                },
        }
@@ -1909,10 +2045,25 @@ func TestNewConnection(t *testing.T) {
        tests := []struct {
                name       string
                args       args
+               setup      func(t *testing.T, args *args)
                wantAssert func(*testing.T, *Connection) bool
        }{
                {
                        name: "just create the connection",
+                       setup: func(t *testing.T, args *args) {
+                               // Setup logger
+                               logger := testutils.ProduceTestingLogger(t)
+                               loggerOption := options.WithCustomLogger(logger)
+
+                               // Set the model logger to the logger above
+                               testutils.SetToTestingLogger(t, 
readWriteModel.Plc4xModelLog)
+
+                               codec := NewMessageCodec(nil, loggerOption)
+                               t.Cleanup(func() {
+                                       assert.NoError(t, codec.Disconnect())
+                               })
+                               args.messageCodec = codec
+                       },
                        wantAssert: func(t *testing.T, connection *Connection) 
bool {
                                return assert.NotNil(t, connection)
                        },
@@ -1920,7 +2071,22 @@ func TestNewConnection(t *testing.T) {
        }
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
-                       assert.True(t, tt.wantAssert(t, 
NewConnection(tt.args.messageCodec, tt.args.configuration, 
tt.args.driverContext, tt.args.tagHandler, tt.args.tm, tt.args.options)), 
"NewConnection(%v, %v, %v, %v, %v, %v)", tt.args.messageCodec, 
tt.args.configuration, tt.args.driverContext, tt.args.tagHandler, tt.args.tm, 
tt.args.options)
+                       if tt.setup != nil {
+                               tt.setup(t, &tt.args)
+                       }
+                       connection := NewConnection(tt.args.messageCodec, 
tt.args.configuration, tt.args.driverContext, tt.args.tagHandler, tt.args.tm, 
tt.args.options)
+                       t.Cleanup(func() {
+                               timer := time.NewTimer(1 * time.Second)
+                               t.Cleanup(func() {
+                                       utils.CleanupTimer(timer)
+                               })
+                               select {
+                               case <-connection.Close():
+                               case <-timer.C:
+                                       t.Error("timeout")
+                               }
+                       })
+                       assert.True(t, tt.wantAssert(t, connection), 
"NewConnection(%v, %v, %v, %v, %v, %v)", tt.args.messageCodec, 
tt.args.configuration, tt.args.driverContext, tt.args.tagHandler, tt.args.tm, 
tt.args.options)
                })
        }
 }
diff --git a/plc4go/internal/cbus/Discoverer.go 
b/plc4go/internal/cbus/Discoverer.go
index 2032b0bae5..c6495aaa7f 100644
--- a/plc4go/internal/cbus/Discoverer.go
+++ b/plc4go/internal/cbus/Discoverer.go
@@ -213,6 +213,12 @@ func (d *Discoverer) 
createDeviceScanDispatcher(tcpTransportInstance *tcp.Transp
                        transportInstanceLogger.Debug().Err(err).Msg("Error 
connecting")
                        return
                }
+               defer func() {
+                       // Disconnect codec when done
+                       if err := codec.Disconnect(); err != nil {
+                               d.log.Warn().Err(err).Msg("Error disconnecting 
codec")
+                       }
+               }()
 
                // Prepare the discovery packet data
                cBusOptions := readWriteModel.NewCBusOptions(false, false, 
false, false, false, false, false, false, true)
diff --git a/plc4go/internal/cbus/Subscriber_test.go 
b/plc4go/internal/cbus/Subscriber_test.go
index 6ffc4f7425..d5d794c950 100644
--- a/plc4go/internal/cbus/Subscriber_test.go
+++ b/plc4go/internal/cbus/Subscriber_test.go
@@ -21,6 +21,9 @@ package cbus
 
 import (
        "context"
+       "github.com/apache/plc4x/plc4go/spi/options"
+       "github.com/apache/plc4x/plc4go/spi/testutils"
+       "github.com/apache/plc4x/plc4go/spi/utils"
        "testing"
        "time"
 
@@ -67,17 +70,39 @@ func TestSubscriber_Subscribe(t *testing.T) {
                name         string
                fields       fields
                args         args
+               setup        func(t *testing.T, fields *fields, args *args)
                wantAsserter func(t *testing.T, results <-chan 
apiModel.PlcSubscriptionRequestResult) bool
        }{
                {
                        name: "just subscribe",
-                       fields: fields{
-                               connection: NewConnection(nil, Configuration{}, 
DriverContext{}, nil, nil, nil),
-                       },
                        args: args{
                                in0:                 context.Background(),
                                subscriptionRequest: 
spiModel.NewDefaultPlcSubscriptionRequest(nil, []string{"blub"}, 
map[string]apiModel.PlcTag{"blub": 
NewMMIMonitorTag(readWriteModel.NewUnitAddress(1), nil, 1)}, nil, nil, nil),
                        },
+                       setup: func(t *testing.T, fields *fields, args *args) {
+                               // Setup logger
+                               logger := testutils.ProduceTestingLogger(t)
+
+                               loggerOption := options.WithCustomLogger(logger)
+
+                               // Set the model logger to the logger above
+                               testutils.SetToTestingLogger(t, 
readWriteModel.Plc4xModelLog)
+
+                               codec := NewMessageCodec(nil, loggerOption)
+                               connection := NewConnection(codec, 
Configuration{}, DriverContext{}, nil, nil, nil, loggerOption)
+                               t.Cleanup(func() {
+                                       timer := time.NewTimer(1 * time.Second)
+                                       t.Cleanup(func() {
+                                               utils.CleanupTimer(timer)
+                                       })
+                                       select {
+                                       case <-connection.Close():
+                                       case <-timer.C:
+                                               t.Error("timeout")
+                                       }
+                               })
+                               fields.connection = connection
+                       },
                        wantAsserter: func(t *testing.T, results <-chan 
apiModel.PlcSubscriptionRequestResult) bool {
                                timer := time.NewTimer(2 * time.Second)
                                defer timer.Stop()
@@ -94,6 +119,9 @@ func TestSubscriber_Subscribe(t *testing.T) {
        }
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
+                       if tt.setup != nil {
+                               tt.setup(t, &tt.fields, &tt.args)
+                       }
                        m := &Subscriber{
                                connection: tt.fields.connection,
                                consumers:  tt.fields.consumers,
diff --git a/plc4go/spi/default/DefaultConnection.go 
b/plc4go/spi/default/DefaultConnection.go
index 492ae8be56..17f8715daa 100644
--- a/plc4go/spi/default/DefaultConnection.go
+++ b/plc4go/spi/default/DefaultConnection.go
@@ -271,10 +271,15 @@ func (d *defaultConnection) BlockingClose() {
 
 func (d *defaultConnection) Close() <-chan plc4go.PlcConnectionCloseResult {
        d.log.Trace().Msg("close connection")
-       if err := d.GetMessageCodec().Disconnect(); err != nil {
-               d.log.Warn().Err(err).Msg("Error disconnecting message code")
+       if messageCodec := d.GetMessageCodec(); messageCodec != nil {
+               if err := messageCodec.Disconnect(); err != nil {
+                       d.log.Warn().Err(err).Msg("Error disconnecting message 
code")
+               }
+       }
+       var err error
+       if transportInstance := d.GetTransportInstance(); transportInstance != 
nil {
+               err = transportInstance.Close()
        }
-       err := d.GetTransportInstance().Close()
        d.SetConnected(false)
        ch := make(chan plc4go.PlcConnectionCloseResult, 1)
        ch <- NewDefaultPlcConnectionCloseResult(d.GetConnection(), err)

Reply via email to