This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new ab70d1b11a feat(plc4go/spi): introduce interfaces for request
transaction manager
ab70d1b11a is described below
commit ab70d1b11a264209515e4f1c9e7c6b03d68ccbd8
Author: Sebastian Rühl <[email protected]>
AuthorDate: Wed Apr 5 16:25:36 2023 +0200
feat(plc4go/spi): introduce interfaces for request transaction manager
---
plc4go/internal/bacnetip/Connection.go | 4 +-
plc4go/internal/bacnetip/Driver.go | 4 +-
plc4go/internal/bacnetip/Reader.go | 4 +-
plc4go/internal/cbus/CBusMessageMapper.go | 2 +-
plc4go/internal/cbus/CBusMessageMapper_test.go | 6 +-
plc4go/internal/cbus/Connection.go | 4 +-
plc4go/internal/cbus/Driver.go | 4 +-
plc4go/internal/cbus/Reader.go | 6 +-
plc4go/internal/cbus/Writer.go | 4 +-
plc4go/internal/eip/Connection.go | 4 +-
plc4go/internal/eip/EipDriver.go | 4 +-
plc4go/internal/eip/Reader.go | 4 +-
plc4go/internal/eip/Writer.go | 4 +-
plc4go/internal/s7/Connection.go | 4 +-
plc4go/internal/s7/Driver.go | 4 +-
plc4go/internal/s7/Reader.go | 4 +-
plc4go/internal/s7/Writer.go | 4 +-
plc4go/spi/RequestTransactionManager.go | 124 +++++++++++++++----------
plc4go/spi/testutils/DriverTestRunner.go | 2 +
19 files changed, 112 insertions(+), 84 deletions(-)
diff --git a/plc4go/internal/bacnetip/Connection.go
b/plc4go/internal/bacnetip/Connection.go
index 30113f1a14..7354d61fe3 100644
--- a/plc4go/internal/bacnetip/Connection.go
+++ b/plc4go/internal/bacnetip/Connection.go
@@ -39,13 +39,13 @@ type Connection struct {
invokeIdGenerator InvokeIdGenerator
messageCodec spi.MessageCodec
subscribers []*Subscriber
- tm *spi.RequestTransactionManager
+ tm spi.RequestTransactionManager
connectionId string
tracer *spi.Tracer
}
-func NewConnection(messageCodec spi.MessageCodec, tagHandler
spi.PlcTagHandler, tm *spi.RequestTransactionManager, options
map[string][]string) *Connection {
+func NewConnection(messageCodec spi.MessageCodec, tagHandler
spi.PlcTagHandler, tm spi.RequestTransactionManager, options
map[string][]string) *Connection {
connection := &Connection{
invokeIdGenerator: InvokeIdGenerator{currentInvokeId: 0},
messageCodec: messageCodec,
diff --git a/plc4go/internal/bacnetip/Driver.go
b/plc4go/internal/bacnetip/Driver.go
index 2803a73c58..afa7fbe606 100644
--- a/plc4go/internal/bacnetip/Driver.go
+++ b/plc4go/internal/bacnetip/Driver.go
@@ -53,7 +53,7 @@ func NewDriver() plc4go.PlcDriver {
applicationManager: ApplicationManager{
applications:
map[string]*ApplicationLayerMessageCodec{},
},
- tm:
*spi.NewRequestTransactionManager(math.MaxInt),
+ tm:
spi.NewRequestTransactionManager(math.MaxInt),
awaitSetupComplete: true,
awaitDisconnectComplete: true,
}
@@ -103,7 +103,7 @@ func (m *Driver) GetConnectionWithContext(ctx
context.Context, transportUrl url.
log.Debug().Msgf("working with codec %#v", codec)
// Create the new connection
- connection := NewConnection(codec, m.GetPlcTagHandler(), &m.tm, options)
+ connection := NewConnection(codec, m.GetPlcTagHandler(), m.tm, options)
log.Debug().Msg("created connection, connecting now")
return connection.ConnectWithContext(ctx)
}
diff --git a/plc4go/internal/bacnetip/Reader.go
b/plc4go/internal/bacnetip/Reader.go
index f965017fd8..d67378150e 100644
--- a/plc4go/internal/bacnetip/Reader.go
+++ b/plc4go/internal/bacnetip/Reader.go
@@ -37,13 +37,13 @@ import (
type Reader struct {
invokeIdGenerator *InvokeIdGenerator
messageCodec spi.MessageCodec
- tm *spi.RequestTransactionManager
+ tm spi.RequestTransactionManager
maxSegmentsAccepted readWriteModel.MaxSegmentsAccepted
maxApduLengthAccepted readWriteModel.MaxApduLengthAccepted
}
-func NewReader(invokeIdGenerator *InvokeIdGenerator, messageCodec
spi.MessageCodec, tm *spi.RequestTransactionManager) *Reader {
+func NewReader(invokeIdGenerator *InvokeIdGenerator, messageCodec
spi.MessageCodec, tm spi.RequestTransactionManager) *Reader {
return &Reader{
invokeIdGenerator: invokeIdGenerator,
messageCodec: messageCodec,
diff --git a/plc4go/internal/cbus/CBusMessageMapper.go
b/plc4go/internal/cbus/CBusMessageMapper.go
index 672a268bc6..8c9d963c7d 100644
--- a/plc4go/internal/cbus/CBusMessageMapper.go
+++ b/plc4go/internal/cbus/CBusMessageMapper.go
@@ -252,7 +252,7 @@ func producePointToPointCommand(unitAddress
readWriteModel.UnitAddress, bridgeAd
return readWriteModel.NewCBusPointToPointCommandDirect(unitAddress,
0x0000, calData, cbusOptions), nil
}
-func MapEncodedReply(transaction *spi.RequestTransaction, encodedReply
readWriteModel.EncodedReply, tagName string, addResponseCode func(name string,
responseCode apiModel.PlcResponseCode), addPlcValue func(name string, plcValue
apiValues.PlcValue)) error {
+func MapEncodedReply(transaction spi.RequestTransaction, encodedReply
readWriteModel.EncodedReply, tagName string, addResponseCode func(name string,
responseCode apiModel.PlcResponseCode), addPlcValue func(name string, plcValue
apiValues.PlcValue)) error {
switch reply := encodedReply.(type) {
case readWriteModel.EncodedReplyCALReplyExactly:
calData := reply.GetCalReply().GetCalData()
diff --git a/plc4go/internal/cbus/CBusMessageMapper_test.go
b/plc4go/internal/cbus/CBusMessageMapper_test.go
index 2a206ac804..d9987f2ed0 100644
--- a/plc4go/internal/cbus/CBusMessageMapper_test.go
+++ b/plc4go/internal/cbus/CBusMessageMapper_test.go
@@ -292,7 +292,7 @@ func TestTagToCBusMessage(t *testing.T) {
func TestMapEncodedReply(t *testing.T) {
type args struct {
- transaction *spi.RequestTransaction
+ transaction spi.RequestTransaction
encodedReply readWriteModel.EncodedReply
tagName string
addResponseCode func(name string, responseCode
apiModel.PlcResponseCode)
@@ -306,7 +306,7 @@ func TestMapEncodedReply(t *testing.T) {
{
name: "empty input",
args: args{
- transaction: func() *spi.RequestTransaction {
+ transaction: func() spi.RequestTransaction {
transactionManager :=
spi.NewRequestTransactionManager(1)
transaction :=
transactionManager.StartTransaction()
transaction.Submit(func() {
@@ -323,7 +323,7 @@ func TestMapEncodedReply(t *testing.T) {
{
name: "CALDataStatus",
args: args{
- transaction: func() *spi.RequestTransaction {
+ transaction: func() spi.RequestTransaction {
transactionManager :=
spi.NewRequestTransactionManager(1)
transaction :=
transactionManager.StartTransaction()
transaction.Submit(func() {
diff --git a/plc4go/internal/cbus/Connection.go
b/plc4go/internal/cbus/Connection.go
index 91062f1baa..4692f94ee5 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -58,7 +58,7 @@ type Connection struct {
alphaGenerator AlphaGenerator
messageCodec spi.MessageCodec
subscribers []*Subscriber
- tm *spi.RequestTransactionManager
+ tm spi.RequestTransactionManager
configuration Configuration
driverContext DriverContext
@@ -67,7 +67,7 @@ type Connection struct {
tracer *spi.Tracer
}
-func NewConnection(messageCodec spi.MessageCodec, configuration Configuration,
driverContext DriverContext, tagHandler spi.PlcTagHandler, tm
*spi.RequestTransactionManager, options map[string][]string) *Connection {
+func NewConnection(messageCodec spi.MessageCodec, configuration Configuration,
driverContext DriverContext, tagHandler spi.PlcTagHandler, tm
spi.RequestTransactionManager, options map[string][]string) *Connection {
connection := &Connection{
alphaGenerator: AlphaGenerator{currentAlpha: 'g'},
messageCodec: messageCodec,
diff --git a/plc4go/internal/cbus/Driver.go b/plc4go/internal/cbus/Driver.go
index 9dd7302765..3a4facc222 100644
--- a/plc4go/internal/cbus/Driver.go
+++ b/plc4go/internal/cbus/Driver.go
@@ -44,7 +44,7 @@ type Driver struct {
func NewDriver() plc4go.PlcDriver {
driver := &Driver{
- tm: *spi.NewRequestTransactionManager(1),
+ tm: spi.NewRequestTransactionManager(1),
awaitSetupComplete: true,
awaitDisconnectComplete: true,
}
@@ -103,7 +103,7 @@ func (m *Driver) GetConnectionWithContext(ctx
context.Context, transportUrl url.
driverContext.awaitDisconnectComplete = m.awaitDisconnectComplete
// Create the new connection
- connection := NewConnection(codec, configuration, driverContext,
m.GetPlcTagHandler(), &m.tm, options)
+ connection := NewConnection(codec, configuration, driverContext,
m.GetPlcTagHandler(), m.tm, options)
log.Debug().Msg("created connection, connecting now")
return connection.ConnectWithContext(ctx)
}
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 7b6cf2dfe6..952f639e23 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -36,10 +36,10 @@ import (
type Reader struct {
alphaGenerator *AlphaGenerator
messageCodec spi.MessageCodec
- tm *spi.RequestTransactionManager
+ tm spi.RequestTransactionManager
}
-func NewReader(tpduGenerator *AlphaGenerator, messageCodec spi.MessageCodec,
tm *spi.RequestTransactionManager) *Reader {
+func NewReader(tpduGenerator *AlphaGenerator, messageCodec spi.MessageCodec,
tm spi.RequestTransactionManager) *Reader {
return &Reader{
alphaGenerator: tpduGenerator,
messageCodec: messageCodec,
@@ -135,7 +135,7 @@ func (m *Reader) readSync(ctx context.Context, readRequest
apiModel.PlcReadReque
}
return
confirmation.GetConfirmation().GetAlpha().GetCharacter() ==
messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(readWriteModel.RequestCommand).GetAlpha().GetCharacter()
}, func(receivedMessage spi.Message) error {
- defer func(transaction *spi.RequestTransaction)
{
+ defer func(transaction spi.RequestTransaction) {
// This is just to make sure we don't
forget to close the transaction here
_ = transaction.EndRequest()
}(transaction)
diff --git a/plc4go/internal/cbus/Writer.go b/plc4go/internal/cbus/Writer.go
index e9257051fe..9029551317 100644
--- a/plc4go/internal/cbus/Writer.go
+++ b/plc4go/internal/cbus/Writer.go
@@ -35,10 +35,10 @@ import (
type Writer struct {
alphaGenerator *AlphaGenerator
messageCodec spi.MessageCodec
- tm *spi.RequestTransactionManager
+ tm spi.RequestTransactionManager
}
-func NewWriter(tpduGenerator *AlphaGenerator, messageCodec spi.MessageCodec,
tm *spi.RequestTransactionManager) Writer {
+func NewWriter(tpduGenerator *AlphaGenerator, messageCodec spi.MessageCodec,
tm spi.RequestTransactionManager) Writer {
return Writer{
alphaGenerator: tpduGenerator,
messageCodec: messageCodec,
diff --git a/plc4go/internal/eip/Connection.go
b/plc4go/internal/eip/Connection.go
index b3c7ccd7ce..cbff5440f4 100644
--- a/plc4go/internal/eip/Connection.go
+++ b/plc4go/internal/eip/Connection.go
@@ -45,7 +45,7 @@ type Connection struct {
messageCodec spi.MessageCodec
configuration Configuration
driverContext DriverContext
- tm *spi.RequestTransactionManager
+ tm spi.RequestTransactionManager
sessionHandle uint32
senderContext []uint8
connectionId uint32
@@ -58,7 +58,7 @@ type Connection struct {
tracer *spi.Tracer
}
-func NewConnection(messageCodec spi.MessageCodec, configuration Configuration,
driverContext DriverContext, tagHandler spi.PlcTagHandler, tm
*spi.RequestTransactionManager, options map[string][]string) *Connection {
+func NewConnection(messageCodec spi.MessageCodec, configuration Configuration,
driverContext DriverContext, tagHandler spi.PlcTagHandler, tm
spi.RequestTransactionManager, options map[string][]string) *Connection {
connection := &Connection{
messageCodec: messageCodec,
configuration: configuration,
diff --git a/plc4go/internal/eip/EipDriver.go b/plc4go/internal/eip/EipDriver.go
index 0483d3b25e..336fbf7808 100644
--- a/plc4go/internal/eip/EipDriver.go
+++ b/plc4go/internal/eip/EipDriver.go
@@ -40,7 +40,7 @@ type Driver struct {
func NewDriver() plc4go.PlcDriver {
driver := &Driver{
- tm: *spi.NewRequestTransactionManager(1),
+ tm: spi.NewRequestTransactionManager(1),
awaitSetupComplete: true,
awaitDisconnectComplete: true,
}
@@ -99,7 +99,7 @@ func (m *Driver) GetConnectionWithContext(ctx
context.Context, transportUrl url.
driverContext.awaitDisconnectComplete = m.awaitDisconnectComplete
// Create the new connection
- connection := NewConnection(codec, configuration, driverContext,
m.GetPlcTagHandler(), &m.tm, options)
+ connection := NewConnection(codec, configuration, driverContext,
m.GetPlcTagHandler(), m.tm, options)
log.Debug().Msg("created connection, connecting now")
return connection.ConnectWithContext(ctx)
}
diff --git a/plc4go/internal/eip/Reader.go b/plc4go/internal/eip/Reader.go
index 2cf06398c8..6a77f8f6cc 100644
--- a/plc4go/internal/eip/Reader.go
+++ b/plc4go/internal/eip/Reader.go
@@ -40,12 +40,12 @@ import (
type Reader struct {
messageCodec spi.MessageCodec
- tm *spi.RequestTransactionManager
+ tm spi.RequestTransactionManager
configuration Configuration
sessionHandle *uint32
}
-func NewReader(messageCodec spi.MessageCodec, tm
*spi.RequestTransactionManager, configuration Configuration, sessionHandle
*uint32) *Reader {
+func NewReader(messageCodec spi.MessageCodec, tm
spi.RequestTransactionManager, configuration Configuration, sessionHandle
*uint32) *Reader {
return &Reader{
messageCodec: messageCodec,
tm: tm,
diff --git a/plc4go/internal/eip/Writer.go b/plc4go/internal/eip/Writer.go
index 209a318c4c..b8d2d10d6e 100644
--- a/plc4go/internal/eip/Writer.go
+++ b/plc4go/internal/eip/Writer.go
@@ -36,13 +36,13 @@ import (
type Writer struct {
messageCodec spi.MessageCodec
- tm *spi.RequestTransactionManager
+ tm spi.RequestTransactionManager
configuration Configuration
sessionHandle *uint32
senderContext *[]uint8
}
-func NewWriter(messageCodec spi.MessageCodec, tm
*spi.RequestTransactionManager, configuration Configuration, sessionHandle
*uint32, senderContext *[]uint8) Writer {
+func NewWriter(messageCodec spi.MessageCodec, tm
spi.RequestTransactionManager, configuration Configuration, sessionHandle
*uint32, senderContext *[]uint8) Writer {
return Writer{
messageCodec: messageCodec,
tm: tm,
diff --git a/plc4go/internal/s7/Connection.go b/plc4go/internal/s7/Connection.go
index 97e00e90a8..819a72a4a4 100644
--- a/plc4go/internal/s7/Connection.go
+++ b/plc4go/internal/s7/Connection.go
@@ -60,13 +60,13 @@ type Connection struct {
messageCodec spi.MessageCodec
configuration Configuration
driverContext DriverContext
- tm *spi.RequestTransactionManager
+ tm spi.RequestTransactionManager
connectionId string
tracer *spi.Tracer
}
-func NewConnection(messageCodec spi.MessageCodec, configuration Configuration,
driverContext DriverContext, tagHandler spi.PlcTagHandler, tm
*spi.RequestTransactionManager, options map[string][]string) *Connection {
+func NewConnection(messageCodec spi.MessageCodec, configuration Configuration,
driverContext DriverContext, tagHandler spi.PlcTagHandler, tm
spi.RequestTransactionManager, options map[string][]string) *Connection {
connection := &Connection{
tpduGenerator: TpduGenerator{currentTpduId: 10},
messageCodec: messageCodec,
diff --git a/plc4go/internal/s7/Driver.go b/plc4go/internal/s7/Driver.go
index edb3801100..4c79b80457 100644
--- a/plc4go/internal/s7/Driver.go
+++ b/plc4go/internal/s7/Driver.go
@@ -40,7 +40,7 @@ type Driver struct {
func NewDriver() plc4go.PlcDriver {
driver := &Driver{
- tm: *spi.NewRequestTransactionManager(1),
+ tm: spi.NewRequestTransactionManager(1),
awaitSetupComplete: true,
awaitDisconnectComplete: true,
}
@@ -99,7 +99,7 @@ func (m *Driver) GetConnectionWithContext(ctx
context.Context, transportUrl url.
driverContext.awaitDisconnectComplete = m.awaitDisconnectComplete
// Create the new connection
- connection := NewConnection(codec, configuration, driverContext,
m.GetPlcTagHandler(), &m.tm, options)
+ connection := NewConnection(codec, configuration, driverContext,
m.GetPlcTagHandler(), m.tm, options)
log.Debug().Msg("created connection, connecting now")
return connection.ConnectWithContext(ctx)
}
diff --git a/plc4go/internal/s7/Reader.go b/plc4go/internal/s7/Reader.go
index a0c3aca945..3379200e78 100644
--- a/plc4go/internal/s7/Reader.go
+++ b/plc4go/internal/s7/Reader.go
@@ -36,10 +36,10 @@ import (
type Reader struct {
tpduGenerator *TpduGenerator
messageCodec spi.MessageCodec
- tm *spi.RequestTransactionManager
+ tm spi.RequestTransactionManager
}
-func NewReader(tpduGenerator *TpduGenerator, messageCodec spi.MessageCodec, tm
*spi.RequestTransactionManager) *Reader {
+func NewReader(tpduGenerator *TpduGenerator, messageCodec spi.MessageCodec, tm
spi.RequestTransactionManager) *Reader {
return &Reader{
tpduGenerator: tpduGenerator,
messageCodec: messageCodec,
diff --git a/plc4go/internal/s7/Writer.go b/plc4go/internal/s7/Writer.go
index cd33c379e7..e8bf221953 100644
--- a/plc4go/internal/s7/Writer.go
+++ b/plc4go/internal/s7/Writer.go
@@ -35,10 +35,10 @@ import (
type Writer struct {
tpduGenerator *TpduGenerator
messageCodec spi.MessageCodec
- tm *spi.RequestTransactionManager
+ tm spi.RequestTransactionManager
}
-func NewWriter(tpduGenerator *TpduGenerator, messageCodec spi.MessageCodec, tm
*spi.RequestTransactionManager) Writer {
+func NewWriter(tpduGenerator *TpduGenerator, messageCodec spi.MessageCodec, tm
spi.RequestTransactionManager) Writer {
return Writer{
tpduGenerator: tpduGenerator,
messageCodec: messageCodec,
diff --git a/plc4go/spi/RequestTransactionManager.go
b/plc4go/spi/RequestTransactionManager.go
index aca4dc4d41..1d4e93dde7 100644
--- a/plc4go/spi/RequestTransactionManager.go
+++ b/plc4go/spi/RequestTransactionManager.go
@@ -42,60 +42,87 @@ func init() {
sharedExecutorInstance.Start()
}
-type RequestTransaction struct {
- parent *RequestTransactionManager
- transactionId int32
-
- /** The initial operation to perform to kick off the request */
- operation utils.Runnable
- completionFuture utils.CompletionFuture
-
- transactionLog zerolog.Logger
-}
-
-func (t *RequestTransaction) String() string {
- return fmt.Sprintf("Transaction{tid:%d}", t.transactionId)
+// RequestTransaction represents a transaction
+type RequestTransaction interface {
+ fmt.Stringer
+ // FailRequest signals that this transaction has failed
+ FailRequest(err error) error
+ // EndRequest signals that this transaction is done
+ EndRequest() error
+ // Submit submits a Runnable to the RequestTransactionManager
+ Submit(operation utils.Runnable)
+ // AwaitCompletion wait for this RequestTransaction to finish. Returns
an error if it finished unsuccessful
+ AwaitCompletion(ctx context.Context) error
}
// RequestTransactionManager handles transactions
-type RequestTransactionManager struct {
- runningRequests []*RequestTransaction
- // How many Transactions are allowed to run at the same time?
- numberOfConcurrentRequests int
- // Assigns each request a Unique Transaction Id, especially important
for failure handling
- transactionId int32
- transactionMutex sync.RWMutex
- // Important, this is a FIFO Queue for Fairness!
- workLog list.List
- workLogMutex sync.RWMutex
- executor utils.Executor
+type RequestTransactionManager interface {
+ // SetNumberOfConcurrentRequests sets the number of concurrent requests
that will be sent out to a device
+ SetNumberOfConcurrentRequests(numberOfConcurrentRequests int)
+ // StartTransaction starts a RequestTransaction
+ StartTransaction() RequestTransaction
}
// NewRequestTransactionManager creates a new RequestTransactionManager
-func NewRequestTransactionManager(numberOfConcurrentRequests int,
requestTransactionManagerOptions ...RequestTransactionManagerOption)
*RequestTransactionManager {
- requestTransactionManager := &RequestTransactionManager{
+func NewRequestTransactionManager(numberOfConcurrentRequests int,
requestTransactionManagerOptions ...RequestTransactionManagerOption)
RequestTransactionManager {
+ _requestTransactionManager := &requestTransactionManager{
numberOfConcurrentRequests: numberOfConcurrentRequests,
transactionId: 0,
workLog: *list.New(),
executor: sharedExecutorInstance,
}
for _, requestTransactionManagerOption := range
requestTransactionManagerOptions {
- requestTransactionManagerOption(requestTransactionManager)
+ requestTransactionManagerOption(_requestTransactionManager)
}
- return requestTransactionManager
+ return _requestTransactionManager
}
-type RequestTransactionManagerOption func(requestTransactionManager
*RequestTransactionManager)
+type RequestTransactionManagerOption func(requestTransactionManager
*requestTransactionManager)
// WithCustomExecutor sets a custom Executor for the RequestTransactionManager
func WithCustomExecutor(executor utils.Executor)
RequestTransactionManagerOption {
- return func(requestTransactionManager *RequestTransactionManager) {
+ return func(requestTransactionManager *requestTransactionManager) {
requestTransactionManager.executor = executor
}
}
-// SetNumberOfConcurrentRequests sets the number of concurrent requests that
will be sent out to a device
-func (r *RequestTransactionManager)
SetNumberOfConcurrentRequests(numberOfConcurrentRequests int) {
+///////////////////////////////////////
+///////////////////////////////////////
+//
+// Internal section
+//
+
+type requestTransaction struct {
+ parent *requestTransactionManager
+ transactionId int32
+
+ /** The initial operation to perform to kick off the request */
+ operation utils.Runnable
+ completionFuture utils.CompletionFuture
+
+ transactionLog zerolog.Logger
+}
+
+type requestTransactionManager struct {
+ runningRequests []*requestTransaction
+ // How many Transactions are allowed to run at the same time?
+ numberOfConcurrentRequests int
+ // Assigns each request a Unique Transaction Id, especially important
for failure handling
+ transactionId int32
+ transactionMutex sync.RWMutex
+ // Important, this is a FIFO Queue for Fairness!
+ workLog list.List
+ workLogMutex sync.RWMutex
+ executor utils.Executor
+}
+
+//
+// Internal section
+//
+///////////////////////////////////////
+///////////////////////////////////////
+
+func (r *requestTransactionManager)
SetNumberOfConcurrentRequests(numberOfConcurrentRequests int) {
log.Info().Msgf("Setting new number of concurrent requests %d",
numberOfConcurrentRequests)
// If we reduced the number of concurrent requests and more requests
are in-flight
// than should be, at least log a warning.
@@ -109,7 +136,7 @@ func (r *RequestTransactionManager)
SetNumberOfConcurrentRequests(numberOfConcur
r.processWorklog()
}
-func (r *RequestTransactionManager) submitHandle(handle *RequestTransaction) {
+func (r *requestTransactionManager) submitHandle(handle *requestTransaction) {
if handle.operation == nil {
panic("invalid handle")
}
@@ -122,7 +149,7 @@ func (r *RequestTransactionManager) submitHandle(handle
*RequestTransaction) {
r.processWorklog()
}
-func (r *RequestTransactionManager) processWorklog() {
+func (r *requestTransactionManager) processWorklog() {
r.workLogMutex.RLock()
defer r.workLogMutex.RUnlock()
log.Debug().Msgf("Processing work log with size of %d (%d concurrent
requests allowed)", r.workLog.Len(), r.numberOfConcurrentRequests)
@@ -131,7 +158,7 @@ func (r *RequestTransactionManager) processWorklog() {
if front == nil {
return
}
- next := front.Value.(*RequestTransaction)
+ next := front.Value.(*requestTransaction)
log.Debug().Msgf("Handling next %v. (Adding to running requests
(length: %d))", next, len(r.runningRequests))
r.runningRequests = append(r.runningRequests, next)
completionFuture := r.executor.Submit(context.Background(),
next.transactionId, next.operation)
@@ -140,8 +167,7 @@ func (r *RequestTransactionManager) processWorklog() {
}
}
-// StartTransaction starts a RequestTransaction
-func (r *RequestTransactionManager) StartTransaction() *RequestTransaction {
+func (r *requestTransactionManager) StartTransaction() RequestTransaction {
r.transactionMutex.Lock()
defer r.transactionMutex.Unlock()
currentTransactionId := r.transactionId
@@ -150,7 +176,7 @@ func (r *RequestTransactionManager) StartTransaction()
*RequestTransaction {
if !config.TraceTransactionManagerTransactions {
transactionLogger = zerolog.Nop()
}
- return &RequestTransaction{
+ return &requestTransaction{
r,
currentTransactionId,
nil,
@@ -159,18 +185,18 @@ func (r *RequestTransactionManager) StartTransaction()
*RequestTransaction {
}
}
-func (r *RequestTransactionManager) getNumberOfActiveRequests() int {
+func (r *requestTransactionManager) getNumberOfActiveRequests() int {
return len(r.runningRequests)
}
-func (r *RequestTransactionManager) failRequest(transaction
*RequestTransaction, err error) error {
+func (r *requestTransactionManager) failRequest(transaction
*requestTransaction, err error) error {
// Try to fail it!
transaction.completionFuture.Cancel(true, err)
// End it
return r.endRequest(transaction)
}
-func (r *RequestTransactionManager) endRequest(transaction
*RequestTransaction) error {
+func (r *requestTransactionManager) endRequest(transaction
*requestTransaction) error {
transaction.transactionLog.Debug().Msg("Trying to find a existing
transaction")
found := false
index := -1
@@ -193,21 +219,18 @@ func (r *RequestTransactionManager)
endRequest(transaction *RequestTransaction)
return nil
}
-// FailRequest signals that this transaction has failed
-func (t *RequestTransaction) FailRequest(err error) error {
+func (t *requestTransaction) FailRequest(err error) error {
t.transactionLog.Trace().Msg("Fail the request")
return t.parent.failRequest(t, err)
}
-// EndRequest signals that this transaction is done
-func (t *RequestTransaction) EndRequest() error {
+func (t *requestTransaction) EndRequest() error {
t.transactionLog.Trace().Msg("Ending the request")
// Remove it from Running Requests
return t.parent.endRequest(t)
}
-// Submit submits a Runnable to the RequestTransactionManager
-func (t *RequestTransaction) Submit(operation utils.Runnable) {
+func (t *requestTransaction) Submit(operation utils.Runnable) {
if t.operation != nil {
panic("Operation already set")
}
@@ -220,8 +243,7 @@ func (t *RequestTransaction) Submit(operation
utils.Runnable) {
t.parent.submitHandle(t)
}
-// AwaitCompletion wait for this RequestTransaction to finish. Returns an
error if it finished unsuccessful
-func (t *RequestTransaction) AwaitCompletion(ctx context.Context) error {
+func (t *requestTransaction) AwaitCompletion(ctx context.Context) error {
for t.completionFuture == nil {
time.Sleep(time.Millisecond * 10)
}
@@ -240,3 +262,7 @@ func (t *RequestTransaction) AwaitCompletion(ctx
context.Context) error {
}
return nil
}
+
+func (t *requestTransaction) String() string {
+ return fmt.Sprintf("Transaction{tid:%d}", t.transactionId)
+}
diff --git a/plc4go/spi/testutils/DriverTestRunner.go
b/plc4go/spi/testutils/DriverTestRunner.go
index 6e7969e433..9ee445e7cc 100644
--- a/plc4go/spi/testutils/DriverTestRunner.go
+++ b/plc4go/spi/testutils/DriverTestRunner.go
@@ -549,7 +549,9 @@ func RunDriverTestsuiteWithOptions(t *testing.T, driver
plc4go.PlcDriver, testPa
}
type ConnectionConnectAwaiter interface {
+ // SetAwaitSetupComplete sets a flag that the driver should await a
connection completion
SetAwaitSetupComplete(awaitComplete bool)
+ // SetAwaitDisconnectComplete sets a flag that the driver should await
a dis-connection completion
SetAwaitDisconnectComplete(awaitComplete bool)
}