This is an automated email from the ASF dual-hosted git repository.

cmorris pushed a commit to branch tendermint
in repository https://gitbox.apache.org/repos/asf/incubator-milagro-dta.git


The following commit(s) were added to refs/heads/tendermint by this push:
     new f3b6f18  Process history, green/red process status, better error 
handling for TX, trap invalid signatures in BLSVerify
     new bb3e96d  Merge branch 'tendermint' of 
ssh://github.com/apache/incubator-milagro-dta into tendermint
f3b6f18 is described below

commit f3b6f1806b70a712effd70f0ad09e66f35179302
Author: Christopher Morris <[email protected]>
AuthorDate: Thu Oct 10 12:22:29 2019 +0100

    Process history, green/red process status, better error handling for TX, 
trap invalid signatures in BLSVerify
---
 .gitignore                  |   1 +
 cmd/service/main.go         |  16 +++---
 go.mod                      |   3 +-
 go.sum                      |   2 +
 libs/documents/crypto.go    |   6 ++-
 pkg/api/proto.go            |   2 +
 pkg/tendermint/connector.go | 120 +++++++++++++++++++++++++++++++++++++++++---
 7 files changed, 135 insertions(+), 15 deletions(-)

diff --git a/.gitignore b/.gitignore
index 9d69337..889a299 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,3 +2,4 @@ target/
 vendor/
 libs/crypto/libpqnist/build/
 /cmd/service/__debug_bin
+/cmd/servicetester/ref
diff --git a/cmd/service/main.go b/cmd/service/main.go
index bb987f8..88a101d 100644
--- a/cmd/service/main.go
+++ b/cmd/service/main.go
@@ -219,19 +219,23 @@ func startDaemon(args []string) error {
                        case "none":
                                return nil
                        case "dump":
-                               svcPlugin.Dump(tx)
+                               return nil
+                               //                              return 
svcPlugin.Dump(tx)
                        case "v1/fulfill/order":
-                               svcPlugin.FulfillOrder(tx)
+                               _, err := svcPlugin.FulfillOrder(tx)
+                               return err
                        case "v1/order2":
-                               svcPlugin.Order2(tx)
+                               _, err := svcPlugin.Order2(tx)
+                               return err
                        case "v1/fulfill/order/secret":
-                               svcPlugin.FulfillOrderSecret(tx)
+                               _, err := svcPlugin.FulfillOrderSecret(tx)
+                               return err
                        case "v1/order/secret2":
-                               svcPlugin.OrderSecret2(tx)
+                               _, err := svcPlugin.OrderSecret2(tx)
+                               return err
                        default:
                                return errors.New("Unknown processor")
                        }
-                       return nil
                }
 
                logger.Info("Starting Blockchain listener to node: %v", 
cfg.Blockchain.BroadcastNode)
diff --git a/go.mod b/go.mod
index dd70335..7e241de 100644
--- a/go.mod
+++ b/go.mod
@@ -5,7 +5,7 @@ require (
        github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3
        github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d
        github.com/coreos/go-oidc v2.0.0+incompatible
-       github.com/fatih/color v1.7.0 // indirect
+       github.com/fatih/color v1.7.0
        github.com/go-kit/kit v0.9.0
        github.com/go-playground/locales v0.12.1 // indirect
        github.com/go-playground/universal-translator v0.16.0 // indirect
@@ -14,6 +14,7 @@ require (
        github.com/gogo/protobuf v1.3.0
        github.com/golang/protobuf v1.3.2
        github.com/google/uuid v1.1.1
+       github.com/gookit/color v1.2.0
        github.com/gorilla/mux v1.7.3
        github.com/hokaccha/go-prettyjson v0.0.0-20190818114111-108c894c2c0e
        github.com/leodido/go-urn v1.1.0 // indirect
diff --git a/go.sum b/go.sum
index 58723e7..583a3c3 100644
--- a/go.sum
+++ b/go.sum
@@ -83,6 +83,8 @@ github.com/google/go-cmp v0.2.0/go.mod 
h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a
 github.com/google/gofuzz v1.0.0/go.mod 
h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
 github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
 github.com/google/uuid v1.1.1/go.mod 
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/gookit/color v1.2.0 h1:lHA77Kuyi5JpBnA9ESvwkY+nanLjRZ0mHbWQXRYk2Lk=
+github.com/gookit/color v1.2.0/go.mod 
h1:AhIE+pS6D4Ql0SQWbBeXPHw7gY0/sjHoA4s/n1KB7xg=
 github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw=
 github.com/gorilla/mux v1.7.3/go.mod 
h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
 github.com/gorilla/websocket v1.4.0/go.mod 
h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
diff --git a/libs/documents/crypto.go b/libs/documents/crypto.go
index dde1e29..f0ce2f4 100644
--- a/libs/documents/crypto.go
+++ b/libs/documents/crypto.go
@@ -136,11 +136,15 @@ func Verify(signedEnvelope SignedEnvelope, blsPK []byte) 
error {
        message := signedEnvelope.Message
        signature := signedEnvelope.Signature
 
+       if len(signature) == 0 {
+               return errors.New("Invalid Signature")
+       }
+
        rc := crypto.BLSVerify(message, blsPK, signature)
        if rc == 0 {
                return nil
        }
-       return errors.New("invalid signature")
+       return errors.New("Invalid signature")
 }
 
 // Appends padding.
diff --git a/pkg/api/proto.go b/pkg/api/proto.go
index 57209a8..54c3a7b 100644
--- a/pkg/api/proto.go
+++ b/pkg/api/proto.go
@@ -56,6 +56,8 @@ const (
 //BlockChainTX - struct for on chain req/resp
 type BlockChainTX struct {
        Processor              string
+       Height                 int64
+       Index                  uint32
        SenderID               string
        RecipientID            string
        AdditionalRecipientIDs []string
diff --git a/pkg/tendermint/connector.go b/pkg/tendermint/connector.go
index bf2f7e7..4810266 100644
--- a/pkg/tendermint/connector.go
+++ b/pkg/tendermint/connector.go
@@ -15,6 +15,7 @@ import (
        "github.com/apache/incubator-milagro-dta/libs/logger"
        "github.com/apache/incubator-milagro-dta/pkg/api"
        status "github.com/apache/incubator-milagro-dta/pkg/tendermint/status"
+       "github.com/fatih/color"
        "github.com/pkg/errors"
        tmclient "github.com/tendermint/tendermint/rpc/client"
        tmtypes "github.com/tendermint/tendermint/types"
@@ -155,15 +156,13 @@ func (nc *NodeConnector) Subscribe(ctx context.Context, 
processFn ProcessTXFunc)
                return errors.Wrap(err, "Failed to obtain latest blockheight of 
Blockchain")
        }
 
-       var processedToHeight int
-       if err := nc.store.Get("chain", "height", &processedToHeight); err != 
nil {
+       var processedTo string
+       if err := nc.store.Get("chain", "height", &processedTo); err != nil {
                if err != datastore.ErrKeyNotFound {
                        return errors.Wrap(err, "Get last processed block 
height")
                }
        }
 
-       nc.log.Debug("Block height: Current: %v; Processed: %v", 
currentBlockHeight, processedToHeight)
-
        // create the transaction queue chan
        txQueue := make(chan *api.BlockChainTX, txChanSize)
 
@@ -172,12 +171,100 @@ func (nc *NodeConnector) Subscribe(ctx context.Context, 
processFn ProcessTXFunc)
                return err
        }
 
+       nc.loadMissingHistory(currentBlockHeight, processedTo, processFn)
        // TODO: load historicTX
 
        // Process events
        return nc.processTXQueue(ctx, txQueue, processFn)
 }
 
+func decodeProcessedTo(processedTo string) (processedToHeight int64, 
processedToIndex uint32, err error) {
+       pth := strings.Split(processedTo, ".")
+
+       if len(pth) == 2 {
+               processedToHeight, err = strconv.ParseInt(pth[0], 10, 64)
+               if err != nil {
+                       return 0, 0, errors.Wrapf(err, "Can't decode processed 
to Height %s", processedTo)
+               }
+               procindex64, err := strconv.ParseUint(pth[1], 10, 32)
+               if err != nil {
+                       return 0, 0, errors.Wrapf(err, "Can't decode processed 
to Index %s", processedTo)
+               }
+               processedToIndex = uint32(procindex64)
+               return processedToHeight, processedToIndex, nil
+       }
+       return 0, 0, nil
+
+}
+
+func (nc *NodeConnector) loadMissingHistory(currentBlockHeight int, 
processedTo string, processFn ProcessTXFunc) error {
+       nc.log.Debug("Block height: Current: %v; Processed: %s", 
currentBlockHeight, processedTo)
+       processedToHeight, processedToIndex, err := 
decodeProcessedTo(processedTo)
+       if err != nil {
+               return err
+       }
+
+       //Open a 2nd websocket client
+       tmNodeAddr := strings.TrimRight(nc.tmNodeAddr, "/")
+       tmHistoryClient := tmclient.NewHTTP(fmt.Sprintf("tcp://%s", 
tmNodeAddr), "/websocket")
+       if err := tmHistoryClient.Start(); err != nil {
+               return errors.Wrap(err, "Start tendermint history client")
+       }
+
+       currentPage := 1
+       query := fmt.Sprintf("tag.recipient='%v' AND tag.sender='%v' AND 
tx.height>=%d AND tx.height<=%d", nc.nodeID, nc.nodeID, processedToHeight, 
currentBlockHeight)
+       numPerPage := 5
+
+       for {
+               result, err := tmHistoryClient.TxSearch(query, true, 
currentPage, numPerPage)
+               if err != nil {
+                       return errors.Wrapf(err, "Failed to subscribe to query 
%s", query)
+               }
+
+               for _, chainTx := range result.Txs {
+
+                       tx := chainTx.Tx
+
+                       payload := &api.BlockChainTX{}
+                       err := json.Unmarshal(tx, payload)
+                       if err != nil {
+                               nc.log.Debug("IGNORED TX - Invalid!")
+                               break
+                       }
+                       payload.Index = chainTx.Index
+                       payload.Height = chainTx.Height
+
+                       //processedTo check
+                       if payload.Height < processedToHeight {
+                               continue
+                       }
+                       if payload.Height == processedToHeight && payload.Index 
<= processedToIndex {
+                               continue
+                       }
+
+                       //Dont queue just process directly
+
+                       if err := processFn(payload); err != nil {
+                               msg := fmt.Sprintf("HISTORY %s Block:%v 
Index:%v Error:%v", color.RedString("FAILURE"), chainTx.Height, chainTx.Index, 
err)
+                               nc.log.Info(msg)
+                       } else {
+                               msg := fmt.Sprintf("HISTORY %s Block:%v 
Index:%v", color.GreenString("PROCESSED"), chainTx.Height, chainTx.Index)
+                               nc.log.Info(msg)
+                       }
+
+                       if err := nc.updateProcessedUpToHeight(chainTx.Height, 
chainTx.Index); err != nil {
+                               return err
+                       }
+
+               }
+               if currentPage*numPerPage > result.TotalCount {
+                       break
+               }
+               currentPage++
+       }
+       return nil
+}
+
 func (nc *NodeConnector) subscribeAndQueue(ctx context.Context, txQueue chan 
*api.BlockChainTX) error {
        query := "tag.recipient='" + nc.nodeID + "'"
 
@@ -194,6 +281,9 @@ func (nc *NodeConnector) subscribeAndQueue(ctx 
context.Context, txQueue chan *ap
                                payload := &api.BlockChainTX{}
 
                                err := json.Unmarshal(tx, payload)
+                               payload.Height = 
result.Data.(tmtypes.EventDataTx).Height
+                               payload.Index = 
result.Data.(tmtypes.EventDataTx).Index
+
                                if err != nil {
                                        nc.log.Debug("IGNORED TX - Invalid!")
                                        break
@@ -221,11 +311,19 @@ func (nc *NodeConnector) subscribeAndQueue(ctx 
context.Context, txQueue chan *ap
 func (nc *NodeConnector) processTXQueue(ctx context.Context, txQueue chan 
*api.BlockChainTX, processFn ProcessTXFunc) error {
        for {
                select {
-               case tx := <-txQueue:
-                       if err := processFn(tx); err != nil {
-                               // TODO: errors block processing the queue
+               case chainTx := <-txQueue:
+
+                       if err := processFn(chainTx); err != nil {
+                               msg := fmt.Sprintf("TX %s Block:%v Index:%v 
Error:%v", color.RedString("FAILURE"), chainTx.Height, chainTx.Index, err)
+                               nc.log.Info(msg)
+                       } else {
+                               msg := fmt.Sprintf("TX %s Block:%v Index:%v", 
color.GreenString("PROCESSED"), chainTx.Height, chainTx.Index)
+                               nc.log.Info(msg)
+                       }
+                       if err := nc.updateProcessedUpToHeight(chainTx.Height, 
chainTx.Index); err != nil {
                                return err
                        }
+
                        // TODO: store the last block height
                case <-ctx.Done():
                        return nil
@@ -233,6 +331,14 @@ func (nc *NodeConnector) processTXQueue(ctx 
context.Context, txQueue chan *api.B
        }
 }
 
+func (nc *NodeConnector) updateProcessedUpToHeight(height int64, index uint32) 
error {
+       processedTo := fmt.Sprintf("%v.%v", height, index)
+       if err := nc.store.Set("chain", "height", &processedTo, nil); err != 
nil {
+               return errors.Wrapf(err, "Failed to update processed up to %s 
", processedTo)
+       }
+       return nil
+}
+
 func (nc *NodeConnector) getChainStatus() (*status.StatusResponse, error) {
        url := fmt.Sprintf("http://%s/status";, nc.tmNodeAddr)
        resp, err := nc.httpClient.Get(url)

Reply via email to