This is an automated email from the ASF dual-hosted git repository. sruehl pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 25480b1d22a08f863ba15383d364e2b29605e35c Author: Sebastian Rühl <[email protected]> AuthorDate: Fri Jun 2 15:17:00 2023 +0200 fix(plc4go): transaction should now be properly handled --- plc4go/internal/bacnetip/Connection.go | 2 +- plc4go/internal/bacnetip/Reader.go | 12 ++++++++++-- plc4go/internal/cbus/Reader.go | 4 ---- plc4go/internal/cbus/Writer.go | 4 +++- plc4go/internal/eip/Reader.go | 4 +++- plc4go/internal/eip/Writer.go | 8 ++++++-- plc4go/internal/s7/Reader.go | 4 +++- plc4go/internal/s7/Writer.go | 4 +++- 8 files changed, 29 insertions(+), 13 deletions(-) diff --git a/plc4go/internal/bacnetip/Connection.go b/plc4go/internal/bacnetip/Connection.go index 516c4b1fe5..5c19e558d9 100644 --- a/plc4go/internal/bacnetip/Connection.go +++ b/plc4go/internal/bacnetip/Connection.go @@ -127,7 +127,7 @@ func (c *Connection) GetMessageCodec() spi.MessageCodec { } func (c *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder { - return spiModel.NewDefaultPlcReadRequestBuilder(c.GetPlcTagHandler(), NewReader(&c.invokeIdGenerator, c.messageCodec, c.tm)) + return spiModel.NewDefaultPlcReadRequestBuilder(c.GetPlcTagHandler(), NewReader(&c.invokeIdGenerator, c.messageCodec, c.tm, options.WithCustomLogger(c.log))) } func (c *Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionRequestBuilder { diff --git a/plc4go/internal/bacnetip/Reader.go b/plc4go/internal/bacnetip/Reader.go index e8e1f5b1cc..1ee5239bc9 100644 --- a/plc4go/internal/bacnetip/Reader.go +++ b/plc4go/internal/bacnetip/Reader.go @@ -22,7 +22,9 @@ package bacnetip import ( "context" "fmt" + "github.com/apache/plc4x/plc4go/spi/options" "github.com/apache/plc4x/plc4go/spi/transactions" + "github.com/rs/zerolog" "time" apiModel "github.com/apache/plc4x/plc4go/pkg/api/model" @@ -42,9 +44,11 @@ type Reader struct { maxSegmentsAccepted readWriteModel.MaxSegmentsAccepted maxApduLengthAccepted readWriteModel.MaxApduLengthAccepted + + log zerolog.Logger } -func NewReader(invokeIdGenerator *InvokeIdGenerator, messageCodec spi.MessageCodec, tm transactions.RequestTransactionManager) *Reader { +func NewReader(invokeIdGenerator *InvokeIdGenerator, messageCodec spi.MessageCodec, tm transactions.RequestTransactionManager, _options ...options.WithOption) *Reader { return &Reader{ invokeIdGenerator: invokeIdGenerator, messageCodec: messageCodec, @@ -52,6 +56,8 @@ func NewReader(invokeIdGenerator *InvokeIdGenerator, messageCodec spi.MessageCod maxSegmentsAccepted: readWriteModel.MaxSegmentsAccepted_MORE_THAN_64_SEGMENTS, maxApduLengthAccepted: readWriteModel.MaxApduLengthAccepted_NUM_OCTETS_1476, + + log: options.ExtractCustomLogger(_options...), } } @@ -190,7 +196,9 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) nil, errors.Wrap(err, "error sending message"), ) - _ = transaction.EndRequest() + if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil { + m.log.Debug().Err(err).Msg("Error failing request") + } } }) }() diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go index 2382a20e1b..e57f344224 100644 --- a/plc4go/internal/cbus/Reader.go +++ b/plc4go/internal/cbus/Reader.go @@ -156,10 +156,6 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction transac expectedAlpha := messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(interface{ GetAlpha() readWriteModel.Alpha }).GetAlpha().GetCharacter() return actualAlpha == expectedAlpha }, func(receivedMessage spi.Message) error { - defer func(transaction transactions.RequestTransaction) { - // This is just to make sure we don't forget to close the transaction here - _ = transaction.EndRequest() - }(transaction) // Convert the response into an m.log.Trace().Msg("convert response to ") messageToClient := receivedMessage.(readWriteModel.CBusMessageToClient) diff --git a/plc4go/internal/cbus/Writer.go b/plc4go/internal/cbus/Writer.go index 35b13f71dc..8dbaf855fe 100644 --- a/plc4go/internal/cbus/Writer.go +++ b/plc4go/internal/cbus/Writer.go @@ -144,7 +144,9 @@ func (m *Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteReques }, time.Second*1); err != nil { m.log.Debug().Err(err).Msgf("Error sending message for tag %s", tagNameCopy) addResponseCode(tagNameCopy, apiModel.PlcResponseCode_INTERNAL_ERROR) - _ = transaction.EndRequest() + if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil { + m.log.Debug().Err(err).Msg("Error failing request") + } } }) } diff --git a/plc4go/internal/eip/Reader.go b/plc4go/internal/eip/Reader.go index 61b5503c6f..a38a0d2ce0 100644 --- a/plc4go/internal/eip/Reader.go +++ b/plc4go/internal/eip/Reader.go @@ -141,7 +141,9 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) nil, errors.Wrap(err, "error sending message"), ) - _ = transaction.EndRequest() + if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil { + m.log.Debug().Err(err).Msg("Error failing request") + } } }) } diff --git a/plc4go/internal/eip/Writer.go b/plc4go/internal/eip/Writer.go index 52cef6d3f3..b04eb3c61d 100644 --- a/plc4go/internal/eip/Writer.go +++ b/plc4go/internal/eip/Writer.go @@ -172,7 +172,9 @@ func (m Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest return transaction.EndRequest() }, time.Second*1); err != nil { result <- spiModel.NewDefaultPlcWriteRequestResult( writeRequest, nil, errors.Wrap(err, "error sending message")) - _ = transaction.EndRequest() + if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil { + m.log.Debug().Err(err).Msg("Error failing request") + } } }) } else { @@ -263,7 +265,9 @@ func (m Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest return transaction.EndRequest() }, time.Second*1); err != nil { result <- spiModel.NewDefaultPlcWriteRequestResult( writeRequest, nil, errors.Wrap(err, "error sending message")) - _ = transaction.EndRequest() + if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil { + m.log.Debug().Err(err).Msg("Error failing request") + } } }) }*/ diff --git a/plc4go/internal/s7/Reader.go b/plc4go/internal/s7/Reader.go index 7b34d87748..59ea75bc28 100644 --- a/plc4go/internal/s7/Reader.go +++ b/plc4go/internal/s7/Reader.go @@ -161,7 +161,9 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) nil, errors.Wrap(err, "error sending message"), ) - _ = transaction.EndRequest() + if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil { + m.log.Debug().Err(err).Msg("Error failing request") + } } }) }() diff --git a/plc4go/internal/s7/Writer.go b/plc4go/internal/s7/Writer.go index bb67795494..22761cd09a 100644 --- a/plc4go/internal/s7/Writer.go +++ b/plc4go/internal/s7/Writer.go @@ -149,7 +149,9 @@ func (m Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest return transaction.EndRequest() }, time.Second*1); err != nil { result <- spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, errors.Wrap(err, "error sending message")) - _ = transaction.EndRequest() + if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil { + m.log.Debug().Err(err).Msg("Error failing request") + } } }) }()
