This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 25480b1d22a08f863ba15383d364e2b29605e35c
Author: Sebastian Rühl <[email protected]>
AuthorDate: Fri Jun 2 15:17:00 2023 +0200

    fix(plc4go): transaction should now be properly handled
---
 plc4go/internal/bacnetip/Connection.go |  2 +-
 plc4go/internal/bacnetip/Reader.go     | 12 ++++++++++--
 plc4go/internal/cbus/Reader.go         |  4 ----
 plc4go/internal/cbus/Writer.go         |  4 +++-
 plc4go/internal/eip/Reader.go          |  4 +++-
 plc4go/internal/eip/Writer.go          |  8 ++++++--
 plc4go/internal/s7/Reader.go           |  4 +++-
 plc4go/internal/s7/Writer.go           |  4 +++-
 8 files changed, 29 insertions(+), 13 deletions(-)

diff --git a/plc4go/internal/bacnetip/Connection.go 
b/plc4go/internal/bacnetip/Connection.go
index 516c4b1fe5..5c19e558d9 100644
--- a/plc4go/internal/bacnetip/Connection.go
+++ b/plc4go/internal/bacnetip/Connection.go
@@ -127,7 +127,7 @@ func (c *Connection) GetMessageCodec() spi.MessageCodec {
 }
 
 func (c *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
-       return spiModel.NewDefaultPlcReadRequestBuilder(c.GetPlcTagHandler(), 
NewReader(&c.invokeIdGenerator, c.messageCodec, c.tm))
+       return spiModel.NewDefaultPlcReadRequestBuilder(c.GetPlcTagHandler(), 
NewReader(&c.invokeIdGenerator, c.messageCodec, c.tm, 
options.WithCustomLogger(c.log)))
 }
 
 func (c *Connection) SubscriptionRequestBuilder() 
apiModel.PlcSubscriptionRequestBuilder {
diff --git a/plc4go/internal/bacnetip/Reader.go 
b/plc4go/internal/bacnetip/Reader.go
index e8e1f5b1cc..1ee5239bc9 100644
--- a/plc4go/internal/bacnetip/Reader.go
+++ b/plc4go/internal/bacnetip/Reader.go
@@ -22,7 +22,9 @@ package bacnetip
 import (
        "context"
        "fmt"
+       "github.com/apache/plc4x/plc4go/spi/options"
        "github.com/apache/plc4x/plc4go/spi/transactions"
+       "github.com/rs/zerolog"
        "time"
 
        apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -42,9 +44,11 @@ type Reader struct {
 
        maxSegmentsAccepted   readWriteModel.MaxSegmentsAccepted
        maxApduLengthAccepted readWriteModel.MaxApduLengthAccepted
+
+       log zerolog.Logger
 }
 
-func NewReader(invokeIdGenerator *InvokeIdGenerator, messageCodec 
spi.MessageCodec, tm transactions.RequestTransactionManager) *Reader {
+func NewReader(invokeIdGenerator *InvokeIdGenerator, messageCodec 
spi.MessageCodec, tm transactions.RequestTransactionManager, _options 
...options.WithOption) *Reader {
        return &Reader{
                invokeIdGenerator: invokeIdGenerator,
                messageCodec:      messageCodec,
@@ -52,6 +56,8 @@ func NewReader(invokeIdGenerator *InvokeIdGenerator, 
messageCodec spi.MessageCod
 
                maxSegmentsAccepted:   
readWriteModel.MaxSegmentsAccepted_MORE_THAN_64_SEGMENTS,
                maxApduLengthAccepted: 
readWriteModel.MaxApduLengthAccepted_NUM_OCTETS_1476,
+
+               log: options.ExtractCustomLogger(_options...),
        }
 }
 
@@ -190,7 +196,9 @@ func (m *Reader) Read(ctx context.Context, readRequest 
apiModel.PlcReadRequest)
                                        nil,
                                        errors.Wrap(err, "error sending 
message"),
                                )
-                               _ = transaction.EndRequest()
+                               if err := 
transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err 
!= nil {
+                                       m.log.Debug().Err(err).Msg("Error 
failing request")
+                               }
                        }
                })
        }()
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 2382a20e1b..e57f344224 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -156,10 +156,6 @@ func (m *Reader) sendMessageOverTheWire(ctx 
context.Context, transaction transac
                expectedAlpha := 
messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(interface{ 
GetAlpha() readWriteModel.Alpha }).GetAlpha().GetCharacter()
                return actualAlpha == expectedAlpha
        }, func(receivedMessage spi.Message) error {
-               defer func(transaction transactions.RequestTransaction) {
-                       // This is just to make sure we don't forget to close 
the transaction here
-                       _ = transaction.EndRequest()
-               }(transaction)
                // Convert the response into an
                m.log.Trace().Msg("convert response to ")
                messageToClient := 
receivedMessage.(readWriteModel.CBusMessageToClient)
diff --git a/plc4go/internal/cbus/Writer.go b/plc4go/internal/cbus/Writer.go
index 35b13f71dc..8dbaf855fe 100644
--- a/plc4go/internal/cbus/Writer.go
+++ b/plc4go/internal/cbus/Writer.go
@@ -144,7 +144,9 @@ func (m *Writer) Write(ctx context.Context, writeRequest 
apiModel.PlcWriteReques
                                }, time.Second*1); err != nil {
                                        m.log.Debug().Err(err).Msgf("Error 
sending message for tag %s", tagNameCopy)
                                        addResponseCode(tagNameCopy, 
apiModel.PlcResponseCode_INTERNAL_ERROR)
-                                       _ = transaction.EndRequest()
+                                       if err := 
transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err 
!= nil {
+                                               
m.log.Debug().Err(err).Msg("Error failing request")
+                                       }
                                }
                        })
                }
diff --git a/plc4go/internal/eip/Reader.go b/plc4go/internal/eip/Reader.go
index 61b5503c6f..a38a0d2ce0 100644
--- a/plc4go/internal/eip/Reader.go
+++ b/plc4go/internal/eip/Reader.go
@@ -141,7 +141,9 @@ func (m *Reader) Read(ctx context.Context, readRequest 
apiModel.PlcReadRequest)
                                                nil,
                                                errors.Wrap(err, "error sending 
message"),
                                        )
-                                       _ = transaction.EndRequest()
+                                       if err := 
transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err 
!= nil {
+                                               
m.log.Debug().Err(err).Msg("Error failing request")
+                                       }
                                }
                        })
                }
diff --git a/plc4go/internal/eip/Writer.go b/plc4go/internal/eip/Writer.go
index 52cef6d3f3..b04eb3c61d 100644
--- a/plc4go/internal/eip/Writer.go
+++ b/plc4go/internal/eip/Writer.go
@@ -172,7 +172,9 @@ func (m Writer) Write(ctx context.Context, writeRequest 
apiModel.PlcWriteRequest
                                                        return 
transaction.EndRequest()
                                                }, time.Second*1); err != nil {
                                                        result <- 
spiModel.NewDefaultPlcWriteRequestResult( writeRequest, nil,      
errors.Wrap(err, "error sending message"))
-                                                       _ = 
transaction.EndRequest()
+                                                       if err := 
transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err 
!= nil {
+                                                               
m.log.Debug().Err(err).Msg("Error failing request")
+                                                       }
                                                }
                                        })
                                } else {
@@ -263,7 +265,9 @@ func (m Writer) Write(ctx context.Context, writeRequest 
apiModel.PlcWriteRequest
                                                        return 
transaction.EndRequest()
                                                }, time.Second*1); err != nil {
                                                        result <- 
spiModel.NewDefaultPlcWriteRequestResult( writeRequest, nil,      
errors.Wrap(err, "error sending message"))
-                                                       _ = 
transaction.EndRequest()
+                                                       if err := 
transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err 
!= nil {
+                                                               
m.log.Debug().Err(err).Msg("Error failing request")
+                                                       }
                                                }
                                        })
                                }*/
diff --git a/plc4go/internal/s7/Reader.go b/plc4go/internal/s7/Reader.go
index 7b34d87748..59ea75bc28 100644
--- a/plc4go/internal/s7/Reader.go
+++ b/plc4go/internal/s7/Reader.go
@@ -161,7 +161,9 @@ func (m *Reader) Read(ctx context.Context, readRequest 
apiModel.PlcReadRequest)
                                        nil,
                                        errors.Wrap(err, "error sending 
message"),
                                )
-                               _ = transaction.EndRequest()
+                               if err := 
transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err 
!= nil {
+                                       m.log.Debug().Err(err).Msg("Error 
failing request")
+                               }
                        }
                })
        }()
diff --git a/plc4go/internal/s7/Writer.go b/plc4go/internal/s7/Writer.go
index bb67795494..22761cd09a 100644
--- a/plc4go/internal/s7/Writer.go
+++ b/plc4go/internal/s7/Writer.go
@@ -149,7 +149,9 @@ func (m Writer) Write(ctx context.Context, writeRequest 
apiModel.PlcWriteRequest
                                return transaction.EndRequest()
                        }, time.Second*1); err != nil {
                                result <- 
spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, errors.Wrap(err, 
"error sending message"))
-                               _ = transaction.EndRequest()
+                               if err := 
transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err 
!= nil {
+                                       m.log.Debug().Err(err).Msg("Error 
failing request")
+                               }
                        }
                })
        }()

Reply via email to