This is an automated email from the ASF dual-hosted git repository. sruehl pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit d041a702ef24b473ebdee95f0baed9367f7b41b4 Author: Sebastian Rühl <[email protected]> AuthorDate: Thu Nov 13 13:29:04 2025 +0100 feat(plc4go): expire expectations on shutdown --- plc4go/internal/cbus/Browser.go | 6 ++---- plc4go/internal/cbus/Browser_test.go | 28 ++++++++++++---------------- plc4go/internal/cbus/MessageCodec.go | 4 ---- plc4go/internal/modbus/Connection.go | 6 ++++-- plc4go/internal/modbus/TagHandler.go | 23 +++++++++++++---------- plc4go/internal/simulated/Connection_test.go | 16 +++++++++++++++- plc4go/internal/simulated/Driver_test.go | 1 - plc4go/pkg/api/PlcDriverManger_test.go | 1 - plc4go/spi/default/DefaultCodec.go | 6 ++++++ plc4go/spi/default/DefaultConnection_test.go | 6 +++++- plc4go/spi/testutils/DriverTestRunner.go | 7 +------ plc4go/spi/testutils/TestUtils.go | 2 +- 12 files changed, 59 insertions(+), 47 deletions(-) diff --git a/plc4go/internal/cbus/Browser.go b/plc4go/internal/cbus/Browser.go index a351cb2944..b190372d3a 100644 --- a/plc4go/internal/cbus/Browser.go +++ b/plc4go/internal/cbus/Browser.go @@ -431,8 +431,6 @@ func (m *Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]an } }) - syncCtx, syncCtxCancel := context.WithTimeout(ctx, 6*time.Second) - defer syncCtxCancel() for !blockOffset0Received || !blockOffset88Received || !blockOffset176Received { select { case <-blockOffset0ReceivedChan: @@ -444,8 +442,8 @@ func (m *Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]an case <-blockOffset176ReceivedChan: m.log.Trace().Msg("Offset 176 received") blockOffset176Received = true - case <-syncCtx.Done(): - err = syncCtx.Err() + case <-ctx.Done(): + err = ctx.Err() m.log.Trace().Err(err).Msg("Ending prematurely") return nil, errors.Wrap(err, "error waiting for other offsets") } diff --git a/plc4go/internal/cbus/Browser_test.go b/plc4go/internal/cbus/Browser_test.go index e44ec02b78..fe6b731dd0 100644 --- a/plc4go/internal/cbus/Browser_test.go +++ b/plc4go/internal/cbus/Browser_test.go @@ -93,7 +93,7 @@ func TestBrowser_BrowseQuery(t *testing.T) { transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, _options...) require.NoError(t, err) t.Cleanup(func() { - assert.NoError(t, transportInstance.Close()) + t.Log(transportInstance.Close()) }) type MockState uint8 const ( @@ -158,13 +158,13 @@ func TestBrowser_BrowseQuery(t *testing.T) { require.NoError(t, err) driver := NewDriver(_options...) t.Cleanup(func() { - assert.NoError(t, driver.Close()) + t.Log(driver.Close()) }) connectionConnectResult := <-driver.GetConnection(t.Context(), transportUrl, map[string]transports.Transport{"test": transport}, map[string][]string{}) require.NoError(t, connectionConnectResult.GetErr()) fields.connection = connectionConnectResult.GetConnection() t.Cleanup(func() { - assert.NoError(t, fields.connection.BlockingClose(t.Context())) + t.Log(fields.connection.BlockingClose(t.Context())) }) args.ctx = testutils.TestContext(t) @@ -201,7 +201,7 @@ func TestBrowser_BrowseQuery(t *testing.T) { assert.Equalf(t, tt.wantQueryResults, got1, "BrowseQuery(%v, func(), %v, \n%v\n)", tt.args.ctx, tt.args.queryName, tt.args.query) if m.connection != nil && m.connection.IsConnected() { t.Log("Closing connection") - <-m.connection.Close() + t.Log(m.connection.BlockingClose(t.Context())) } }) } @@ -247,7 +247,7 @@ func TestBrowser_browseUnitInfo(t *testing.T) { transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, _options...) require.NoError(t, err) t.Cleanup(func() { - assert.NoError(t, transportInstance.Close()) + t.Log(transportInstance.Close()) }) type MockState uint8 const ( @@ -312,18 +312,13 @@ func TestBrowser_browseUnitInfo(t *testing.T) { require.NoError(t, err) driver := NewDriver(_options...) t.Cleanup(func() { - assert.NoError(t, driver.Close()) + t.Log(driver.Close()) }) connectionConnectResult := <-driver.GetConnection(t.Context(), transportUrl, map[string]transports.Transport{"test": transport}, map[string][]string{}) require.NoError(t, connectionConnectResult.GetErr()) fields.connection = connectionConnectResult.GetConnection() t.Cleanup(func() { - timer := time.NewTimer(10 * time.Second) - select { - case <-fields.connection.Close(): - case <-timer.C: - t.Error("timeout") - } + t.Log(fields.connection.BlockingClose(t.Context())) }) args.ctx = testutils.TestContext(t) @@ -361,7 +356,7 @@ func TestBrowser_browseUnitInfo(t *testing.T) { assert.Equalf(t, tt.wantQueryResults, gotQueryResults, "browseUnitInfo(%v, %v, %v, %v)", tt.args.ctx, tt.args.interceptor != nil, tt.args.queryName, tt.args.query) if m.connection != nil && m.connection.IsConnected() { t.Log("Closing connection") - <-m.connection.Close() + t.Log(m.connection.BlockingClose(t.Context())) } }) } @@ -526,7 +521,7 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) { transportInstance, err := transport.CreateTransportInstance(transportUrl, map[string][]string{"simulatedLatency": {"10ms"}}, _options...) require.NoError(t, err) t.Cleanup(func() { - assert.NoError(t, transportInstance.Close()) + t.Log(transportInstance.Close()) }) type MockState uint8 const ( @@ -588,13 +583,13 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) { require.NoError(t, transport.AddPreregisteredInstances(transportUrl, transportInstance)) driver := NewDriver(_options...) t.Cleanup(func() { - assert.NoError(t, driver.Close()) + t.Log(driver.Close()) }) connectionConnectResult := <-driver.GetConnection(t.Context(), transportUrl, map[string]transports.Transport{"test": transport}, map[string][]string{}) require.NoError(t, connectionConnectResult.GetErr()) fields.connection = connectionConnectResult.GetConnection() t.Cleanup(func() { - assert.NoError(t, fields.connection.BlockingClose(t.Context())) + t.Log(fields.connection.BlockingClose(t.Context())) }) args.ctx = testutils.TestContext(t) @@ -631,6 +626,7 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) { return } assert.Equalf(t, tt.want, got, "getInstalledUnitAddressBytes(%v)", tt.args.ctx) + t.Log("Banananaaaa") }) } } diff --git a/plc4go/internal/cbus/MessageCodec.go b/plc4go/internal/cbus/MessageCodec.go index e1b971920e..f1e266674f 100644 --- a/plc4go/internal/cbus/MessageCodec.go +++ b/plc4go/internal/cbus/MessageCodec.go @@ -134,18 +134,15 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { // Fill the buffer { if err := ti.FillBuffer(ctx, func(pos uint, currentByte byte, reader transports.ExtendedReader) bool { - m.log.Trace().Uint("pos", pos).Uint8("byte", currentByte).Str("rune", string(rune(currentByte))).Msg("current byte") switch currentByte { case readWriteModel.ResponseTermination_CR, readWriteModel.ResponseTermination_LF: - m.log.Trace().Msg("Found termination byte") return false case byte(readWriteModel.ConfirmationType_CONFIRMATION_SUCCESSFUL): confirmation = true // In case we have directly more data in the buffer after a confirmation _, err := reader.Peek(int(pos + 1)) - m.log.Trace().Err(err).Msg("Peeking one more") return err == nil case byte(readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS), @@ -154,7 +151,6 @@ func (m *MessageCodec) Receive(ctx context.Context) (spi.Message, error) { byte(readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG), byte(readWriteModel.ConfirmationType_CHECKSUM_FAILURE): confirmation = true - m.log.Trace().Msg("Found confirmation") return false default: return true diff --git a/plc4go/internal/modbus/Connection.go b/plc4go/internal/modbus/Connection.go index aad444d21e..b02c41801f 100644 --- a/plc4go/internal/modbus/Connection.go +++ b/plc4go/internal/modbus/Connection.go @@ -77,8 +77,10 @@ func NewConnection(unitIdentifier uint8, messageCodec spi.MessageCodec, connecti } } connection.DefaultConnection = _default.NewDefaultConnection(connection, - _default.WithPlcTagHandler(tagHandler), - _default.WithPlcValueHandler(NewValueHandler(_options...)), + append(_options, + _default.WithPlcTagHandler(tagHandler), + _default.WithPlcValueHandler(NewValueHandler(_options...)), + )..., ) return connection } diff --git a/plc4go/internal/modbus/TagHandler.go b/plc4go/internal/modbus/TagHandler.go index b5705158f2..d1e8d00325 100644 --- a/plc4go/internal/modbus/TagHandler.go +++ b/plc4go/internal/modbus/TagHandler.go @@ -60,6 +60,8 @@ type TagHandler struct { plc4xExtendedRegisterPattern *regexp.Regexp numericExtendedRegisterPattern *regexp.Regexp + options []options.WithOption + log zerolog.Logger } @@ -78,6 +80,7 @@ func NewTagHandler(_options ...options.WithOption) TagHandler { numericHoldingRegisterPattern: regexp.MustCompile("^4[xX]?" + generalFixedDigitAddressPattern), plc4xExtendedRegisterPattern: regexp.MustCompile("^extended-register:" + generalAddressPattern), numericExtendedRegisterPattern: regexp.MustCompile("^6[xX]?" + generalFixedDigitAddressPattern), + options: _options, log: customLogger, } } @@ -88,61 +91,61 @@ func (m TagHandler) ParseTag(tagAddress string) (apiModel.PlcTag, error) { if !ok { return nil, errors.Errorf("Unknown type %s", match["datatype"]) } - return NewModbusPlcTagFromStrings(Coil, match["address"], match["quantity"], typeByName) + return NewModbusPlcTagFromStrings(Coil, match["address"], match["quantity"], typeByName, m.options...) } else if match := utils.GetSubgroupMatches(m.numericCoilPattern, tagAddress); match != nil { typeByName, ok := readWriteModel.ModbusDataTypeByName(match["datatype"]) if !ok { return nil, errors.Errorf("Unknown type %s", match["datatype"]) } - return NewModbusPlcTagFromStrings(Coil, match["address"], match["quantity"], typeByName) + return NewModbusPlcTagFromStrings(Coil, match["address"], match["quantity"], typeByName, m.options...) } else if match := utils.GetSubgroupMatches(m.plc4xDiscreteInputPattern, tagAddress); match != nil { typeByName, ok := readWriteModel.ModbusDataTypeByName(match["datatype"]) if !ok { return nil, errors.Errorf("Unknown type %s", match["datatype"]) } - return NewModbusPlcTagFromStrings(DiscreteInput, match["address"], match["quantity"], typeByName) + return NewModbusPlcTagFromStrings(DiscreteInput, match["address"], match["quantity"], typeByName, m.options...) } else if match := utils.GetSubgroupMatches(m.numericDiscreteInputPattern, tagAddress); match != nil { typeByName, ok := readWriteModel.ModbusDataTypeByName(match["datatype"]) if !ok { return nil, errors.Errorf("Unknown type %s", match["datatype"]) } - return NewModbusPlcTagFromStrings(DiscreteInput, match["address"], match["quantity"], typeByName) + return NewModbusPlcTagFromStrings(DiscreteInput, match["address"], match["quantity"], typeByName, m.options...) } else if match := utils.GetSubgroupMatches(m.plc4xInputRegisterPattern, tagAddress); match != nil { typeByName, ok := readWriteModel.ModbusDataTypeByName(match["datatype"]) if !ok { return nil, errors.Errorf("Unknown type %s", match["datatype"]) } - return NewModbusPlcTagFromStrings(InputRegister, match["address"], match["quantity"], typeByName) + return NewModbusPlcTagFromStrings(InputRegister, match["address"], match["quantity"], typeByName, m.options...) } else if match := utils.GetSubgroupMatches(m.numericInputRegisterPattern, tagAddress); match != nil { typeByName, ok := readWriteModel.ModbusDataTypeByName(match["datatype"]) if !ok { return nil, errors.Errorf("Unknown type %s", match["datatype"]) } - return NewModbusPlcTagFromStrings(InputRegister, match["address"], match["quantity"], typeByName) + return NewModbusPlcTagFromStrings(InputRegister, match["address"], match["quantity"], typeByName, m.options...) } else if match := utils.GetSubgroupMatches(m.plc4xHoldingRegisterPattern, tagAddress); match != nil { typeByName, ok := readWriteModel.ModbusDataTypeByName(match["datatype"]) if !ok { return nil, errors.Errorf("Unknown type %s", match["datatype"]) } - return NewModbusPlcTagFromStrings(HoldingRegister, match["address"], match["quantity"], typeByName) + return NewModbusPlcTagFromStrings(HoldingRegister, match["address"], match["quantity"], typeByName, m.options...) } else if match := utils.GetSubgroupMatches(m.numericHoldingRegisterPattern, tagAddress); match != nil { typeByName, ok := readWriteModel.ModbusDataTypeByName(match["datatype"]) if !ok { return nil, errors.Errorf("Unknown type %s", match["datatype"]) } - return NewModbusPlcTagFromStrings(HoldingRegister, match["address"], match["quantity"], typeByName) + return NewModbusPlcTagFromStrings(HoldingRegister, match["address"], match["quantity"], typeByName, m.options...) } else if match := utils.GetSubgroupMatches(m.plc4xExtendedRegisterPattern, tagAddress); match != nil { typeByName, ok := readWriteModel.ModbusDataTypeByName(match["datatype"]) if !ok { return nil, errors.Errorf("Unknown type %s", match["datatype"]) } - return NewModbusPlcTagFromStrings(ExtendedRegister, match["address"], match["quantity"], typeByName) + return NewModbusPlcTagFromStrings(ExtendedRegister, match["address"], match["quantity"], typeByName, m.options...) } else if match := utils.GetSubgroupMatches(m.numericExtendedRegisterPattern, tagAddress); match != nil { typeByName, ok := readWriteModel.ModbusDataTypeByName(match["datatype"]) if !ok { return nil, errors.Errorf("Unknown type %s", match["datatype"]) } - return NewModbusPlcTagFromStrings(ExtendedRegister, match["address"], match["quantity"], typeByName) + return NewModbusPlcTagFromStrings(ExtendedRegister, match["address"], match["quantity"], typeByName, m.options...) } return nil, errors.Errorf("Invalid address format for address '%s'", tagAddress) } diff --git a/plc4go/internal/simulated/Connection_test.go b/plc4go/internal/simulated/Connection_test.go index c7b96c4fd5..6531b2a0ac 100644 --- a/plc4go/internal/simulated/Connection_test.go +++ b/plc4go/internal/simulated/Connection_test.go @@ -20,6 +20,7 @@ package simulated import ( + "context" "sync" "testing" "time" @@ -274,9 +275,13 @@ func TestConnection_BlockingClose(t *testing.T) { options map[string][]string connected bool } + type args struct { + ctx context.Context + } tests := []struct { name string fields fields + args args delayAtLeast time.Duration }{ { @@ -288,6 +293,9 @@ func TestConnection_BlockingClose(t *testing.T) { options: map[string][]string{}, connected: true, }, + args: args{ + ctx: t.Context(), + }, delayAtLeast: 0, }, { @@ -299,6 +307,9 @@ func TestConnection_BlockingClose(t *testing.T) { options: map[string][]string{}, connected: false, }, + args: args{ + ctx: t.Context(), + }, delayAtLeast: 0, }, { @@ -312,6 +323,9 @@ func TestConnection_BlockingClose(t *testing.T) { }, connected: true, }, + args: args{ + ctx: t.Context(), + }, delayAtLeast: 1000, }, } @@ -330,7 +344,7 @@ func TestConnection_BlockingClose(t *testing.T) { var wg sync.WaitGroup t.Cleanup(wg.Wait) wg.Go(func() { - c.BlockingClose() + t.Log(c.BlockingClose(tt.args.ctx)) ch <- true }) return ch diff --git a/plc4go/internal/simulated/Driver_test.go b/plc4go/internal/simulated/Driver_test.go index 47cc8dafd2..ead907765c 100644 --- a/plc4go/internal/simulated/Driver_test.go +++ b/plc4go/internal/simulated/Driver_test.go @@ -22,7 +22,6 @@ package simulated import ( "net/url" "testing" - "time" apiModel "github.com/apache/plc4x/plc4go/pkg/api/model" "github.com/apache/plc4x/plc4go/spi/options" diff --git a/plc4go/pkg/api/PlcDriverManger_test.go b/plc4go/pkg/api/PlcDriverManger_test.go index 779297a128..192b4aba50 100644 --- a/plc4go/pkg/api/PlcDriverManger_test.go +++ b/plc4go/pkg/api/PlcDriverManger_test.go @@ -23,7 +23,6 @@ import ( "context" "os" "testing" - "time" "github.com/pkg/errors" "github.com/rs/zerolog" diff --git a/plc4go/spi/default/DefaultCodec.go b/plc4go/spi/default/DefaultCodec.go index f3963222aa..a690d8adf9 100644 --- a/plc4go/spi/default/DefaultCodec.go +++ b/plc4go/spi/default/DefaultCodec.go @@ -190,6 +190,12 @@ func (m *defaultCodec) Disconnect() error { return errors.Wrap(err, "error closing transport instance") } } + for _, expectation := range m.expectations { + m.wg.Go(func() { + _ = expectation.GetHandleError()(errors.New("disconnected")) + }) + } + m.wg.Wait() m.log.Trace().Msg("disconnected") return nil } diff --git a/plc4go/spi/default/DefaultConnection_test.go b/plc4go/spi/default/DefaultConnection_test.go index a33eece6a8..935bd0d343 100644 --- a/plc4go/spi/default/DefaultConnection_test.go +++ b/plc4go/spi/default/DefaultConnection_test.go @@ -25,6 +25,7 @@ import ( "testing" "time" + "github.com/apache/plc4x/plc4go/spi/testutils" "github.com/rs/zerolog/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -416,7 +417,9 @@ func Test_defaultConnection_BlockingClose(t *testing.T) { setup: func(t *testing.T, fields *fields, args *args) { requirements := NewMockDefaultConnectionRequirements(t) connection := NewMockPlcConnection(t) - connection.EXPECT().Close().Return(nil) + ch := make(chan plc4go.PlcConnectionCloseResult, 1) + ch <- NewDefaultPlcConnectionCloseResult(connection, nil) + connection.EXPECT().Close().Return(ch) requirements.EXPECT().GetConnection().Return(connection) fields.DefaultConnectionRequirements = requirements var cancelFunc context.CancelFunc @@ -435,6 +438,7 @@ func Test_defaultConnection_BlockingClose(t *testing.T) { DefaultConnectionRequirements: tt.fields.DefaultConnectionRequirements, tagHandler: tt.fields.tagHandler, valueHandler: tt.fields.valueHandler, + log: testutils.ProduceTestingLogger(t), } tt.wantErr(t, d.BlockingClose(tt.args.ctx)) }) diff --git a/plc4go/spi/testutils/DriverTestRunner.go b/plc4go/spi/testutils/DriverTestRunner.go index cacf6dbcf9..a29ac59f18 100644 --- a/plc4go/spi/testutils/DriverTestRunner.go +++ b/plc4go/spi/testutils/DriverTestRunner.go @@ -119,12 +119,7 @@ func (m DriverTestsuite) Run(t *testing.T, driverManager plc4go.PlcDriverManager } connection := connectionResult.GetConnection() t.Cleanup(func() { - select { - case result := <-connection.Close(): - assert.NoError(t, result.GetErr()) - case <-t.Context().Done(): - t.Error("timeout closing connection") - } + assert.NoError(t, connection.BlockingClose(ctx)) }) utils.NewAsciiBoxWriter() m.LogDelimiterSection(t, "=", "Executing testcase: %s", testcase.name) diff --git a/plc4go/spi/testutils/TestUtils.go b/plc4go/spi/testutils/TestUtils.go index d87f968ec6..23b98bf7b2 100644 --- a/plc4go/spi/testutils/TestUtils.go +++ b/plc4go/spi/testutils/TestUtils.go @@ -144,7 +144,7 @@ func init() { zerolog.TimeFieldFormat = time.RFC3339Nano } getOrLeaveBool("PLC4X_TEST_PASS_LOGGER_TO_MODEL", &passLoggerToModel) - receiveTimeout = 3 * time.Second + receiveTimeout = 60 * time.Second getOrLeaveDuration("PLC4X_TEST_RECEIVE_TIMEOUT_MS", &receiveTimeout) getOrLeaveBool("PLC4X_TEST_TRACE_TRANSACTION_MANAGER_WORKERS", &traceTransactionManagerWorkers) getOrLeaveBool("PLC4X_TEST_TRACE_TRANSACTION_MANAGER_TRANSACTIONS", &traceTransactionManagerTransactions)
