This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2b89544  plc4go: Enabled last missing s7 test
2b89544 is described below

commit 2b89544ab285d1add85f048e0975bd90f7f2b0a3
Author: Sebastian Rühl <sru...@apache.org>
AuthorDate: Thu Apr 15 17:48:47 2021 +0200

    plc4go: Enabled last missing s7 test
    
    + added global config flags/bools for TraceTransactionManagerTransactions 
and TraceDefaultMessageCodecWorker
    + fixed major issues in RequestTransactionManager.go
---
 go.mod                                             |   2 +-
 go.sum                                             |   2 +
 plc4go/cmd/main/drivers/tests/s7_driver_test.go    |   4 +-
 plc4go/cmd/main/initializetest/init.go             |   3 +-
 plc4go/internal/plc4go/s7/Reader.go                |   2 +-
 plc4go/internal/plc4go/s7/Writer.go                |   2 +-
 plc4go/internal/plc4go/spi/MessageCodec.go         |  25 ++++-
 .../plc4go/spi/RequestTransactionManager.go        | 117 +++++++++++----------
 plc4go/pkg/plc4go/config/config.go                 |   4 +
 9 files changed, 95 insertions(+), 66 deletions(-)

diff --git a/go.mod b/go.mod
index c5e2d6c..e3d4832 100644
--- a/go.mod
+++ b/go.mod
@@ -21,6 +21,6 @@ module github.com/apache/plc4x
 go 1.15
 
 require (
-       github.com/apache/plc4x/plc4go v0.0.0-20210414203637-8ceab5f36488 // 
indirect
+       github.com/apache/plc4x/plc4go v0.0.0-20210415110959-a3286c557e58 // 
indirect
        github.com/sirupsen/logrus v1.7.0 // indirect
 )
diff --git a/go.sum b/go.sum
index 45f9197..158d146 100644
--- a/go.sum
+++ b/go.sum
@@ -62,6 +62,8 @@ github.com/apache/plc4x/plc4go 
v0.0.0-20210414142028-1a71fd546e6f h1:1yt/bAzOyiu
 github.com/apache/plc4x/plc4go v0.0.0-20210414142028-1a71fd546e6f/go.mod 
h1:NfO8uGKPGwDxn1GqOb4oNhAtPF7St1A9LRk1J/qSlWU=
 github.com/apache/plc4x/plc4go v0.0.0-20210414203637-8ceab5f36488 
h1:nIIC6owfVBxfwjvYTzaTyT99Z5+29tL9M1J8lbFr+ow=
 github.com/apache/plc4x/plc4go v0.0.0-20210414203637-8ceab5f36488/go.mod 
h1:NfO8uGKPGwDxn1GqOb4oNhAtPF7St1A9LRk1J/qSlWU=
+github.com/apache/plc4x/plc4go v0.0.0-20210415110959-a3286c557e58 
h1:2xwfhviaiXnjRRQhBg1dFA+GskvpwhrIGdVAmpBw6A4=
+github.com/apache/plc4x/plc4go v0.0.0-20210415110959-a3286c557e58/go.mod 
h1:NfO8uGKPGwDxn1GqOb4oNhAtPF7St1A9LRk1J/qSlWU=
 github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod 
h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/dnephin/pflag v1.0.7/go.mod 
h1:uxE91IoWURlOiTUIA8Mq5ZZkAv3dPUfZNaT80Zm7OQE=
diff --git a/plc4go/cmd/main/drivers/tests/s7_driver_test.go 
b/plc4go/cmd/main/drivers/tests/s7_driver_test.go
index bac2bee..0dc12e3 100644
--- a/plc4go/cmd/main/drivers/tests/s7_driver_test.go
+++ b/plc4go/cmd/main/drivers/tests/s7_driver_test.go
@@ -26,7 +26,5 @@ import (
 )
 
 func TestS7Driver(t *testing.T) {
-       testutils.RunDriverTestsuite(t, s7.NewDriver(), 
"assets/testing/protocols/s7/DriverTestsuite.xml",
-               // TODO: this feature is still a WIP
-               "Single element read request with disabled PUT/GET")
+       testutils.RunDriverTestsuite(t, s7.NewDriver(), 
"assets/testing/protocols/s7/DriverTestsuite.xml")
 }
diff --git a/plc4go/cmd/main/initializetest/init.go 
b/plc4go/cmd/main/initializetest/init.go
index 7df81f6..ebdbdfb 100644
--- a/plc4go/cmd/main/initializetest/init.go
+++ b/plc4go/cmd/main/initializetest/init.go
@@ -30,6 +30,5 @@ func init() {
                //// Enable below if you want to see the filenames
                //With().Caller().Logger().
                Output(zerolog.ConsoleWriter{Out: os.Stderr, NoColor: 
onJenkins}).
-               // TODO: set to INFO once log mining is done
-               Level(zerolog.TraceLevel)
+               Level(zerolog.InfoLevel)
 }
diff --git a/plc4go/internal/plc4go/s7/Reader.go 
b/plc4go/internal/plc4go/s7/Reader.go
index 2c6b73c..113a1f6 100644
--- a/plc4go/internal/plc4go/s7/Reader.go
+++ b/plc4go/internal/plc4go/s7/Reader.go
@@ -90,7 +90,7 @@ func (m *Reader) Read(readRequest model.PlcReadRequest) 
<-chan model.PlcReadRequ
                        ),
                )
                // Start a new request-transaction (Is ended in the 
response-handler)
-               transaction := m.tm.StartRequest()
+               transaction := m.tm.StartTransaction()
                transaction.Submit(func() {
 
                        // Send the  over the wire
diff --git a/plc4go/internal/plc4go/s7/Writer.go 
b/plc4go/internal/plc4go/s7/Writer.go
index 9ac4531..e06917e 100644
--- a/plc4go/internal/plc4go/s7/Writer.go
+++ b/plc4go/internal/plc4go/s7/Writer.go
@@ -95,7 +95,7 @@ func (m Writer) Write(writeRequest model.PlcWriteRequest) 
<-chan model.PlcWriteR
                )
 
                // Start a new request-transaction (Is ended in the 
response-handler)
-               transaction := m.tm.StartRequest()
+               transaction := m.tm.StartTransaction()
                transaction.Submit(func() {
                        // Send the  over the wire
                        if err := m.messageCodec.SendRequest(
diff --git a/plc4go/internal/plc4go/spi/MessageCodec.go 
b/plc4go/internal/plc4go/spi/MessageCodec.go
index 6c2dfc4..35b87be 100644
--- a/plc4go/internal/plc4go/spi/MessageCodec.go
+++ b/plc4go/internal/plc4go/spi/MessageCodec.go
@@ -22,7 +22,9 @@ import (
        "fmt"
        "github.com/apache/plc4x/plc4go/internal/plc4go/spi/plcerrors"
        "github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports"
+       "github.com/apache/plc4x/plc4go/pkg/plc4go/config"
        "github.com/pkg/errors"
+       "github.com/rs/zerolog"
        "github.com/rs/zerolog/log"
        "time"
 )
@@ -127,7 +129,9 @@ func (m *DefaultCodec) Connect() error {
        err := m.TransportInstance.Connect()
        if err == nil {
                if !m.Running {
+                       log.Debug().Msg("Message codec currently not running")
                        if m.CustomWorkLoop != nil {
+                               log.Info().Msg("Starting with custom loop")
                                go 
m.CustomWorkLoop(&m.DefaultCodecRequiredInterface)
                        } else {
                                go m.Work(&m.DefaultCodecRequiredInterface)
@@ -220,30 +224,38 @@ func (m *DefaultCodec) Work(codec 
*DefaultCodecRequiredInterface) {
                log.Info().Msg("Keep running")
                m.Work(codec)
        }()
+
+       workerLog := log.With().Logger()
+       if !config.TraceDefaultMessageCodecWorker {
+               workerLog = zerolog.Nop()
+       }
        // Start an endless loop
 mainLoop:
        for m.Running {
+               workerLog.Trace().Msg("Working")
                // Check for any expired expectations.
                // (Doing this outside the loop lets us expire expectations 
even if no input is coming in)
                now := time.Now()
 
                // Guard against empty expectations
                if len(m.Expectations) <= 0 {
-                       log.Trace().Msg("we got no expectation")
+                       workerLog.Trace().Msg("no available expectations")
                        // Sleep for 10ms
                        time.Sleep(time.Millisecond * 10)
                        continue mainLoop
                }
                m.TimeoutExpectations(now)
 
+               workerLog.Trace().Msg("Receiving message")
                // Check for incoming messages.
                message, err := m.Receive()
                if err != nil {
-                       log.Error().Err(err).Msg("got an error reading from 
transport")
+                       workerLog.Error().Err(err).Msg("got an error reading 
from transport")
                        time.Sleep(time.Millisecond * 10)
                        continue mainLoop
                }
                if message == nil {
+                       workerLog.Trace().Msg("Not enough data yet")
                        // Sleep for 10ms before checking again, in order to not
                        // consume 100% CPU Power.
                        time.Sleep(time.Millisecond * 10)
@@ -251,18 +263,25 @@ mainLoop:
                }
 
                if m.CustomMessageHandling != nil {
+                       workerLog.Trace().Msg("Executing custom handling")
                        if m.CustomMessageHandling(codec, message) {
                                continue mainLoop
                        }
                }
 
+               workerLog.Trace().Msg("Handle message")
                // Go through all expectations
                messageHandled := m.HandleMessages(message)
 
                // If the message has not been handled and a default handler is 
provided, call this ...
                if !messageHandled {
+                       workerLog.Trace().Msg("Message was not handled")
                        // TODO: how do we prevent endless blocking if there is 
no reader on this channel?
-                       m.DefaultIncomingMessageChannel <- message
+                       select {
+                       case m.DefaultIncomingMessageChannel <- message:
+                       default:
+                               workerLog.Warn().Msg("Message discarded")
+                       }
                }
        }
 }
diff --git a/plc4go/internal/plc4go/spi/RequestTransactionManager.go 
b/plc4go/internal/plc4go/spi/RequestTransactionManager.go
index bdf4024..f0624f4 100644
--- a/plc4go/internal/plc4go/spi/RequestTransactionManager.go
+++ b/plc4go/internal/plc4go/spi/RequestTransactionManager.go
@@ -21,11 +21,11 @@ package spi
 
 import (
        "container/list"
+       "fmt"
        "github.com/apache/plc4x/plc4go/pkg/plc4go/config"
        "github.com/pkg/errors"
        "github.com/rs/zerolog"
        "github.com/rs/zerolog/log"
-       "reflect"
        "sync"
        "time"
 )
@@ -85,10 +85,15 @@ func (w Worker) work() {
 }
 
 type WorkItem struct {
+       transactionId    int32
        runnable         Runnable
        completionFuture *CompletionFuture
 }
 
+func (w WorkItem) String() string {
+       return fmt.Sprintf("Workitem{tid:%d}", w.transactionId)
+}
+
 type Executor struct {
        running     bool
        shutdown    bool
@@ -119,12 +124,16 @@ func NewFixedSizeExecutor(numberOfWorkers int) *Executor {
        return &executor
 }
 
-func (e *Executor) submit(runnable Runnable) *CompletionFuture {
+func (e *Executor) submit(transactionId int32, runnable Runnable) 
*CompletionFuture {
+       log.Trace().Int32("transactionId", transactionId).Msg("Submitting 
runnable")
        completionFuture := &CompletionFuture{}
+       // TODO: add select and timeout if queue is full
        e.queue <- WorkItem{
+               transactionId:    transactionId,
                runnable:         runnable,
                completionFuture: completionFuture,
        }
+       log.Trace().Int32("transactionId", transactionId).Msg("runnable queued")
        return completionFuture
 }
 
@@ -163,12 +172,14 @@ type CompletionFuture struct {
        interruptRequested bool
        completed          bool
        errored            bool
+       err                error
 }
 
-func (f CompletionFuture) cancel(interrupt bool) {
+func (f CompletionFuture) cancel(interrupt bool, err error) {
        f.cancelRequested = true
        f.interruptRequested = interrupt
        f.errored = true
+       f.err = err
 }
 
 func (f CompletionFuture) complete() {
@@ -188,6 +199,12 @@ type RequestTransaction struct {
        /** The initial operation to perform to kick off the request */
        operation        Runnable
        completionFuture *CompletionFuture
+
+       transactionLog zerolog.Logger
+}
+
+func (t RequestTransaction) String() string {
+       return fmt.Sprintf("Transaction{tid:%d}", t.transactionId)
 }
 
 type RequestTransactionManager struct {
@@ -215,6 +232,7 @@ func (r *RequestTransactionManager) 
getNumberOfConcurrentRequests() int {
 }
 
 func (r *RequestTransactionManager) 
SetNumberOfConcurrentRequests(numberOfConcurrentRequests int) {
+       log.Info().Msgf("Setting new number of concurrent requests %d", 
numberOfConcurrentRequests)
        // If we reduced the number of concurrent requests and more requests 
are in-flight
        // than should be, at least log a warning.
        if numberOfConcurrentRequests < len(r.runningRequests) {
@@ -227,12 +245,6 @@ func (r *RequestTransactionManager) 
SetNumberOfConcurrentRequests(numberOfConcur
        r.processWorklog()
 }
 
-func (r *RequestTransactionManager) submit(context func(RequestTransaction)) {
-       transaction := r.StartRequest()
-       context(*transaction)
-       // r.submitHandle(transaction);
-}
-
 func (r *RequestTransactionManager) submitHandle(handle *RequestTransaction) {
        if handle.operation == nil {
                panic("invalid handle")
@@ -249,39 +261,57 @@ func (r *RequestTransactionManager) submitHandle(handle 
*RequestTransaction) {
 func (r *RequestTransactionManager) processWorklog() {
        r.worklogMutex.RLock()
        defer r.worklogMutex.RUnlock()
-       for len(r.runningRequests) < r.getNumberOfConcurrentRequests() && 
r.worklog.Len() > 0 {
-               next := r.worklog.Front().Value.(*RequestTransaction)
+       log.Debug().Msgf("Processing work log with size of %d (%d concurrent 
requests allowed)", r.worklog.Len(), r.numberOfConcurrentRequests)
+       for len(r.runningRequests) < r.numberOfConcurrentRequests && 
r.worklog.Len() > 0 {
+               front := r.worklog.Front()
+               if front == nil {
+                       return
+               }
+               next := front.Value.(*RequestTransaction)
+               log.Debug().Msgf("Handling next %v. (Adding to running requests 
(length: %d))", next, len(r.runningRequests))
                r.runningRequests = append(r.runningRequests, next)
-               completionFuture := executor.submit(next.operation)
+               completionFuture := executor.submit(next.transactionId, 
next.operation)
                next.completionFuture = completionFuture
+               r.worklog.Remove(front)
        }
 }
 
-func (r *RequestTransactionManager) StartRequest() *RequestTransaction {
+func (r *RequestTransactionManager) StartTransaction() *RequestTransaction {
        r.transationMutex.Lock()
        defer r.transationMutex.Unlock()
        currentTransactionId := r.transactionId
        r.transactionId += 1
-       return &RequestTransaction{r, currentTransactionId, nil, nil}
+       transactionLogger := log.With().Int32("transactionId", 
currentTransactionId).Logger()
+       if !config.TraceTransactionManagerTransactions {
+               transactionLogger = zerolog.Nop()
+       }
+       return &RequestTransaction{
+               r,
+               currentTransactionId,
+               nil,
+               nil,
+               transactionLogger,
+       }
 }
 
 func (r *RequestTransactionManager) getNumberOfActiveRequests() int {
        return len(r.runningRequests)
 }
 
-func (r *RequestTransactionManager) failRequest(transaction 
*RequestTransaction) error {
+func (r *RequestTransactionManager) failRequest(transaction 
*RequestTransaction, err error) error {
        // Try to fail it!
-       transaction.completionFuture.cancel(true)
+       transaction.completionFuture.cancel(true, err)
        // End it
        return r.endRequest(transaction)
 }
 
 func (r *RequestTransactionManager) endRequest(transaction 
*RequestTransaction) error {
+       transaction.transactionLog.Debug().Msg("Trying to find a existing 
transaction")
        found := false
        index := -1
        for i, runningRequest := range r.runningRequests {
-               // TODO: check this implementation
-               if (&runningRequest) == (&transaction) {
+               if runningRequest.transactionId == transaction.transactionId {
+                       transaction.transactionLog.Debug().Msg("Found a 
existing transaction")
                        found = true
                        index = i
                        break
@@ -290,61 +320,38 @@ func (r *RequestTransactionManager) 
endRequest(transaction *RequestTransaction)
        if !found {
                return errors.New("Unknown Transaction or Transaction already 
finished!")
        }
+       transaction.transactionLog.Debug().Msg("Removing the existing 
transaction transaction")
        r.runningRequests = append(r.runningRequests[:index], 
r.runningRequests[index+1:]...)
        // Process the worklog, a slot should be free now
+       transaction.transactionLog.Debug().Msg("Processing the worklog")
        r.processWorklog()
        return nil
 }
 
-func (t *RequestTransaction) start() {
-}
-
-func (t *RequestTransaction) failRequest(err error) error {
-       return t.parent.failRequest(t)
+func (t *RequestTransaction) FailRequest(err error) error {
+       t.transactionLog.Trace().Msg("Fail the request")
+       return t.parent.failRequest(t, err)
 }
 
 func (t *RequestTransaction) EndRequest() error {
+       t.transactionLog.Trace().Msg("Ending the request")
        // Remove it from Running Requests
        return t.parent.endRequest(t)
 }
 
-func (t *RequestTransaction) setOperation(operation Runnable) {
-       t.operation = operation
-}
-
-func (t *RequestTransaction) getCompletionFuture() *CompletionFuture {
-       return t.completionFuture
-}
-
-func (t *RequestTransaction) setCompletionFuture(completionFuture 
*CompletionFuture) {
-       t.completionFuture = completionFuture
-}
-
 func (t *RequestTransaction) Submit(operation Runnable) {
-       log.Trace().Msgf("Submission of transaction %d", t.transactionId)
-       t.setOperation(NewTransactionOperation(t.transactionId, operation))
-       t.parent.submitHandle(t)
-}
-
-func (t *RequestTransaction) equals(o *RequestTransaction) bool {
-       if t == o {
-               return true
-       }
-       if o == nil || reflect.TypeOf(t).Kind() != reflect.TypeOf(o).Kind() {
-               return false
+       if t.operation != nil {
+               panic("Operation already set")
        }
-       that := o
-       return t.transactionId == that.transactionId
-}
-
-func (t *RequestTransaction) hashCode() int32 {
-       return t.transactionId
+       t.transactionLog.Trace().Msgf("Submission of transaction %d", 
t.transactionId)
+       t.operation = t.NewTransactionOperation(operation)
+       t.parent.submitHandle(t)
 }
 
-func NewTransactionOperation(transactionId int32, delegate Runnable) Runnable {
+func (t *RequestTransaction) NewTransactionOperation(delegate Runnable) 
Runnable {
        return func() {
-               log.Trace().Int32("transactionId", transactionId).Msgf("Start 
execution of transaction %d", transactionId)
+               t.transactionLog.Trace().Msgf("Start execution of transaction 
%d", t.transactionId)
                delegate()
-               log.Trace().Int32("transactionId", 
transactionId).Msgf("Completed execution of transaction %d", transactionId)
+               t.transactionLog.Trace().Msgf("Completed execution of 
transaction %d", t.transactionId)
        }
 }
diff --git a/plc4go/pkg/plc4go/config/config.go 
b/plc4go/pkg/plc4go/config/config.go
index 4f8049c..1aaf81e 100644
--- a/plc4go/pkg/plc4go/config/config.go
+++ b/plc4go/pkg/plc4go/config/config.go
@@ -21,7 +21,11 @@ package config
 
 // TraceTransactionManagerWorkers when set to true the transaction manager 
displays worker states in log
 var TraceTransactionManagerWorkers bool
+var TraceTransactionManagerTransactions bool
+var TraceDefaultMessageCodecWorker bool
 
 func init() {
        TraceTransactionManagerWorkers = false
+       TraceTransactionManagerTransactions = false
+       TraceDefaultMessageCodecWorker = false
 }

Reply via email to