This is an automated email from the ASF dual-hosted git repository. sruehl pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push: new 2b89544 plc4go: Enabled last missing s7 test 2b89544 is described below commit 2b89544ab285d1add85f048e0975bd90f7f2b0a3 Author: Sebastian Rühl <sru...@apache.org> AuthorDate: Thu Apr 15 17:48:47 2021 +0200 plc4go: Enabled last missing s7 test + added global config flags/bools for TraceTransactionManagerTransactions and TraceDefaultMessageCodecWorker + fixed major issues in RequestTransactionManager.go --- go.mod | 2 +- go.sum | 2 + plc4go/cmd/main/drivers/tests/s7_driver_test.go | 4 +- plc4go/cmd/main/initializetest/init.go | 3 +- plc4go/internal/plc4go/s7/Reader.go | 2 +- plc4go/internal/plc4go/s7/Writer.go | 2 +- plc4go/internal/plc4go/spi/MessageCodec.go | 25 ++++- .../plc4go/spi/RequestTransactionManager.go | 117 +++++++++++---------- plc4go/pkg/plc4go/config/config.go | 4 + 9 files changed, 95 insertions(+), 66 deletions(-) diff --git a/go.mod b/go.mod index c5e2d6c..e3d4832 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,6 @@ module github.com/apache/plc4x go 1.15 require ( - github.com/apache/plc4x/plc4go v0.0.0-20210414203637-8ceab5f36488 // indirect + github.com/apache/plc4x/plc4go v0.0.0-20210415110959-a3286c557e58 // indirect github.com/sirupsen/logrus v1.7.0 // indirect ) diff --git a/go.sum b/go.sum index 45f9197..158d146 100644 --- a/go.sum +++ b/go.sum @@ -62,6 +62,8 @@ github.com/apache/plc4x/plc4go v0.0.0-20210414142028-1a71fd546e6f h1:1yt/bAzOyiu github.com/apache/plc4x/plc4go v0.0.0-20210414142028-1a71fd546e6f/go.mod h1:NfO8uGKPGwDxn1GqOb4oNhAtPF7St1A9LRk1J/qSlWU= github.com/apache/plc4x/plc4go v0.0.0-20210414203637-8ceab5f36488 h1:nIIC6owfVBxfwjvYTzaTyT99Z5+29tL9M1J8lbFr+ow= github.com/apache/plc4x/plc4go v0.0.0-20210414203637-8ceab5f36488/go.mod h1:NfO8uGKPGwDxn1GqOb4oNhAtPF7St1A9LRk1J/qSlWU= +github.com/apache/plc4x/plc4go v0.0.0-20210415110959-a3286c557e58 h1:2xwfhviaiXnjRRQhBg1dFA+GskvpwhrIGdVAmpBw6A4= +github.com/apache/plc4x/plc4go v0.0.0-20210415110959-a3286c557e58/go.mod h1:NfO8uGKPGwDxn1GqOb4oNhAtPF7St1A9LRk1J/qSlWU= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dnephin/pflag v1.0.7/go.mod h1:uxE91IoWURlOiTUIA8Mq5ZZkAv3dPUfZNaT80Zm7OQE= diff --git a/plc4go/cmd/main/drivers/tests/s7_driver_test.go b/plc4go/cmd/main/drivers/tests/s7_driver_test.go index bac2bee..0dc12e3 100644 --- a/plc4go/cmd/main/drivers/tests/s7_driver_test.go +++ b/plc4go/cmd/main/drivers/tests/s7_driver_test.go @@ -26,7 +26,5 @@ import ( ) func TestS7Driver(t *testing.T) { - testutils.RunDriverTestsuite(t, s7.NewDriver(), "assets/testing/protocols/s7/DriverTestsuite.xml", - // TODO: this feature is still a WIP - "Single element read request with disabled PUT/GET") + testutils.RunDriverTestsuite(t, s7.NewDriver(), "assets/testing/protocols/s7/DriverTestsuite.xml") } diff --git a/plc4go/cmd/main/initializetest/init.go b/plc4go/cmd/main/initializetest/init.go index 7df81f6..ebdbdfb 100644 --- a/plc4go/cmd/main/initializetest/init.go +++ b/plc4go/cmd/main/initializetest/init.go @@ -30,6 +30,5 @@ func init() { //// Enable below if you want to see the filenames //With().Caller().Logger(). Output(zerolog.ConsoleWriter{Out: os.Stderr, NoColor: onJenkins}). - // TODO: set to INFO once log mining is done - Level(zerolog.TraceLevel) + Level(zerolog.InfoLevel) } diff --git a/plc4go/internal/plc4go/s7/Reader.go b/plc4go/internal/plc4go/s7/Reader.go index 2c6b73c..113a1f6 100644 --- a/plc4go/internal/plc4go/s7/Reader.go +++ b/plc4go/internal/plc4go/s7/Reader.go @@ -90,7 +90,7 @@ func (m *Reader) Read(readRequest model.PlcReadRequest) <-chan model.PlcReadRequ ), ) // Start a new request-transaction (Is ended in the response-handler) - transaction := m.tm.StartRequest() + transaction := m.tm.StartTransaction() transaction.Submit(func() { // Send the over the wire diff --git a/plc4go/internal/plc4go/s7/Writer.go b/plc4go/internal/plc4go/s7/Writer.go index 9ac4531..e06917e 100644 --- a/plc4go/internal/plc4go/s7/Writer.go +++ b/plc4go/internal/plc4go/s7/Writer.go @@ -95,7 +95,7 @@ func (m Writer) Write(writeRequest model.PlcWriteRequest) <-chan model.PlcWriteR ) // Start a new request-transaction (Is ended in the response-handler) - transaction := m.tm.StartRequest() + transaction := m.tm.StartTransaction() transaction.Submit(func() { // Send the over the wire if err := m.messageCodec.SendRequest( diff --git a/plc4go/internal/plc4go/spi/MessageCodec.go b/plc4go/internal/plc4go/spi/MessageCodec.go index 6c2dfc4..35b87be 100644 --- a/plc4go/internal/plc4go/spi/MessageCodec.go +++ b/plc4go/internal/plc4go/spi/MessageCodec.go @@ -22,7 +22,9 @@ import ( "fmt" "github.com/apache/plc4x/plc4go/internal/plc4go/spi/plcerrors" "github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports" + "github.com/apache/plc4x/plc4go/pkg/plc4go/config" "github.com/pkg/errors" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" "time" ) @@ -127,7 +129,9 @@ func (m *DefaultCodec) Connect() error { err := m.TransportInstance.Connect() if err == nil { if !m.Running { + log.Debug().Msg("Message codec currently not running") if m.CustomWorkLoop != nil { + log.Info().Msg("Starting with custom loop") go m.CustomWorkLoop(&m.DefaultCodecRequiredInterface) } else { go m.Work(&m.DefaultCodecRequiredInterface) @@ -220,30 +224,38 @@ func (m *DefaultCodec) Work(codec *DefaultCodecRequiredInterface) { log.Info().Msg("Keep running") m.Work(codec) }() + + workerLog := log.With().Logger() + if !config.TraceDefaultMessageCodecWorker { + workerLog = zerolog.Nop() + } // Start an endless loop mainLoop: for m.Running { + workerLog.Trace().Msg("Working") // Check for any expired expectations. // (Doing this outside the loop lets us expire expectations even if no input is coming in) now := time.Now() // Guard against empty expectations if len(m.Expectations) <= 0 { - log.Trace().Msg("we got no expectation") + workerLog.Trace().Msg("no available expectations") // Sleep for 10ms time.Sleep(time.Millisecond * 10) continue mainLoop } m.TimeoutExpectations(now) + workerLog.Trace().Msg("Receiving message") // Check for incoming messages. message, err := m.Receive() if err != nil { - log.Error().Err(err).Msg("got an error reading from transport") + workerLog.Error().Err(err).Msg("got an error reading from transport") time.Sleep(time.Millisecond * 10) continue mainLoop } if message == nil { + workerLog.Trace().Msg("Not enough data yet") // Sleep for 10ms before checking again, in order to not // consume 100% CPU Power. time.Sleep(time.Millisecond * 10) @@ -251,18 +263,25 @@ mainLoop: } if m.CustomMessageHandling != nil { + workerLog.Trace().Msg("Executing custom handling") if m.CustomMessageHandling(codec, message) { continue mainLoop } } + workerLog.Trace().Msg("Handle message") // Go through all expectations messageHandled := m.HandleMessages(message) // If the message has not been handled and a default handler is provided, call this ... if !messageHandled { + workerLog.Trace().Msg("Message was not handled") // TODO: how do we prevent endless blocking if there is no reader on this channel? - m.DefaultIncomingMessageChannel <- message + select { + case m.DefaultIncomingMessageChannel <- message: + default: + workerLog.Warn().Msg("Message discarded") + } } } } diff --git a/plc4go/internal/plc4go/spi/RequestTransactionManager.go b/plc4go/internal/plc4go/spi/RequestTransactionManager.go index bdf4024..f0624f4 100644 --- a/plc4go/internal/plc4go/spi/RequestTransactionManager.go +++ b/plc4go/internal/plc4go/spi/RequestTransactionManager.go @@ -21,11 +21,11 @@ package spi import ( "container/list" + "fmt" "github.com/apache/plc4x/plc4go/pkg/plc4go/config" "github.com/pkg/errors" "github.com/rs/zerolog" "github.com/rs/zerolog/log" - "reflect" "sync" "time" ) @@ -85,10 +85,15 @@ func (w Worker) work() { } type WorkItem struct { + transactionId int32 runnable Runnable completionFuture *CompletionFuture } +func (w WorkItem) String() string { + return fmt.Sprintf("Workitem{tid:%d}", w.transactionId) +} + type Executor struct { running bool shutdown bool @@ -119,12 +124,16 @@ func NewFixedSizeExecutor(numberOfWorkers int) *Executor { return &executor } -func (e *Executor) submit(runnable Runnable) *CompletionFuture { +func (e *Executor) submit(transactionId int32, runnable Runnable) *CompletionFuture { + log.Trace().Int32("transactionId", transactionId).Msg("Submitting runnable") completionFuture := &CompletionFuture{} + // TODO: add select and timeout if queue is full e.queue <- WorkItem{ + transactionId: transactionId, runnable: runnable, completionFuture: completionFuture, } + log.Trace().Int32("transactionId", transactionId).Msg("runnable queued") return completionFuture } @@ -163,12 +172,14 @@ type CompletionFuture struct { interruptRequested bool completed bool errored bool + err error } -func (f CompletionFuture) cancel(interrupt bool) { +func (f CompletionFuture) cancel(interrupt bool, err error) { f.cancelRequested = true f.interruptRequested = interrupt f.errored = true + f.err = err } func (f CompletionFuture) complete() { @@ -188,6 +199,12 @@ type RequestTransaction struct { /** The initial operation to perform to kick off the request */ operation Runnable completionFuture *CompletionFuture + + transactionLog zerolog.Logger +} + +func (t RequestTransaction) String() string { + return fmt.Sprintf("Transaction{tid:%d}", t.transactionId) } type RequestTransactionManager struct { @@ -215,6 +232,7 @@ func (r *RequestTransactionManager) getNumberOfConcurrentRequests() int { } func (r *RequestTransactionManager) SetNumberOfConcurrentRequests(numberOfConcurrentRequests int) { + log.Info().Msgf("Setting new number of concurrent requests %d", numberOfConcurrentRequests) // If we reduced the number of concurrent requests and more requests are in-flight // than should be, at least log a warning. if numberOfConcurrentRequests < len(r.runningRequests) { @@ -227,12 +245,6 @@ func (r *RequestTransactionManager) SetNumberOfConcurrentRequests(numberOfConcur r.processWorklog() } -func (r *RequestTransactionManager) submit(context func(RequestTransaction)) { - transaction := r.StartRequest() - context(*transaction) - // r.submitHandle(transaction); -} - func (r *RequestTransactionManager) submitHandle(handle *RequestTransaction) { if handle.operation == nil { panic("invalid handle") @@ -249,39 +261,57 @@ func (r *RequestTransactionManager) submitHandle(handle *RequestTransaction) { func (r *RequestTransactionManager) processWorklog() { r.worklogMutex.RLock() defer r.worklogMutex.RUnlock() - for len(r.runningRequests) < r.getNumberOfConcurrentRequests() && r.worklog.Len() > 0 { - next := r.worklog.Front().Value.(*RequestTransaction) + log.Debug().Msgf("Processing work log with size of %d (%d concurrent requests allowed)", r.worklog.Len(), r.numberOfConcurrentRequests) + for len(r.runningRequests) < r.numberOfConcurrentRequests && r.worklog.Len() > 0 { + front := r.worklog.Front() + if front == nil { + return + } + next := front.Value.(*RequestTransaction) + log.Debug().Msgf("Handling next %v. (Adding to running requests (length: %d))", next, len(r.runningRequests)) r.runningRequests = append(r.runningRequests, next) - completionFuture := executor.submit(next.operation) + completionFuture := executor.submit(next.transactionId, next.operation) next.completionFuture = completionFuture + r.worklog.Remove(front) } } -func (r *RequestTransactionManager) StartRequest() *RequestTransaction { +func (r *RequestTransactionManager) StartTransaction() *RequestTransaction { r.transationMutex.Lock() defer r.transationMutex.Unlock() currentTransactionId := r.transactionId r.transactionId += 1 - return &RequestTransaction{r, currentTransactionId, nil, nil} + transactionLogger := log.With().Int32("transactionId", currentTransactionId).Logger() + if !config.TraceTransactionManagerTransactions { + transactionLogger = zerolog.Nop() + } + return &RequestTransaction{ + r, + currentTransactionId, + nil, + nil, + transactionLogger, + } } func (r *RequestTransactionManager) getNumberOfActiveRequests() int { return len(r.runningRequests) } -func (r *RequestTransactionManager) failRequest(transaction *RequestTransaction) error { +func (r *RequestTransactionManager) failRequest(transaction *RequestTransaction, err error) error { // Try to fail it! - transaction.completionFuture.cancel(true) + transaction.completionFuture.cancel(true, err) // End it return r.endRequest(transaction) } func (r *RequestTransactionManager) endRequest(transaction *RequestTransaction) error { + transaction.transactionLog.Debug().Msg("Trying to find a existing transaction") found := false index := -1 for i, runningRequest := range r.runningRequests { - // TODO: check this implementation - if (&runningRequest) == (&transaction) { + if runningRequest.transactionId == transaction.transactionId { + transaction.transactionLog.Debug().Msg("Found a existing transaction") found = true index = i break @@ -290,61 +320,38 @@ func (r *RequestTransactionManager) endRequest(transaction *RequestTransaction) if !found { return errors.New("Unknown Transaction or Transaction already finished!") } + transaction.transactionLog.Debug().Msg("Removing the existing transaction transaction") r.runningRequests = append(r.runningRequests[:index], r.runningRequests[index+1:]...) // Process the worklog, a slot should be free now + transaction.transactionLog.Debug().Msg("Processing the worklog") r.processWorklog() return nil } -func (t *RequestTransaction) start() { -} - -func (t *RequestTransaction) failRequest(err error) error { - return t.parent.failRequest(t) +func (t *RequestTransaction) FailRequest(err error) error { + t.transactionLog.Trace().Msg("Fail the request") + return t.parent.failRequest(t, err) } func (t *RequestTransaction) EndRequest() error { + t.transactionLog.Trace().Msg("Ending the request") // Remove it from Running Requests return t.parent.endRequest(t) } -func (t *RequestTransaction) setOperation(operation Runnable) { - t.operation = operation -} - -func (t *RequestTransaction) getCompletionFuture() *CompletionFuture { - return t.completionFuture -} - -func (t *RequestTransaction) setCompletionFuture(completionFuture *CompletionFuture) { - t.completionFuture = completionFuture -} - func (t *RequestTransaction) Submit(operation Runnable) { - log.Trace().Msgf("Submission of transaction %d", t.transactionId) - t.setOperation(NewTransactionOperation(t.transactionId, operation)) - t.parent.submitHandle(t) -} - -func (t *RequestTransaction) equals(o *RequestTransaction) bool { - if t == o { - return true - } - if o == nil || reflect.TypeOf(t).Kind() != reflect.TypeOf(o).Kind() { - return false + if t.operation != nil { + panic("Operation already set") } - that := o - return t.transactionId == that.transactionId -} - -func (t *RequestTransaction) hashCode() int32 { - return t.transactionId + t.transactionLog.Trace().Msgf("Submission of transaction %d", t.transactionId) + t.operation = t.NewTransactionOperation(operation) + t.parent.submitHandle(t) } -func NewTransactionOperation(transactionId int32, delegate Runnable) Runnable { +func (t *RequestTransaction) NewTransactionOperation(delegate Runnable) Runnable { return func() { - log.Trace().Int32("transactionId", transactionId).Msgf("Start execution of transaction %d", transactionId) + t.transactionLog.Trace().Msgf("Start execution of transaction %d", t.transactionId) delegate() - log.Trace().Int32("transactionId", transactionId).Msgf("Completed execution of transaction %d", transactionId) + t.transactionLog.Trace().Msgf("Completed execution of transaction %d", t.transactionId) } } diff --git a/plc4go/pkg/plc4go/config/config.go b/plc4go/pkg/plc4go/config/config.go index 4f8049c..1aaf81e 100644 --- a/plc4go/pkg/plc4go/config/config.go +++ b/plc4go/pkg/plc4go/config/config.go @@ -21,7 +21,11 @@ package config // TraceTransactionManagerWorkers when set to true the transaction manager displays worker states in log var TraceTransactionManagerWorkers bool +var TraceTransactionManagerTransactions bool +var TraceDefaultMessageCodecWorker bool func init() { TraceTransactionManagerWorkers = false + TraceTransactionManagerTransactions = false + TraceDefaultMessageCodecWorker = false }