This is an automated email from the ASF dual-hosted git repository.
cmorris pushed a commit to branch tendermint
in repository https://gitbox.apache.org/repos/asf/incubator-milagro-dta.git
The following commit(s) were added to refs/heads/tendermint by this push:
new f3b6f18 Process history, green/red process status, better error
handling for TX, trap invalid signatures in BLSVerify
new bb3e96d Merge branch 'tendermint' of
ssh://github.com/apache/incubator-milagro-dta into tendermint
f3b6f18 is described below
commit f3b6f1806b70a712effd70f0ad09e66f35179302
Author: Christopher Morris <[email protected]>
AuthorDate: Thu Oct 10 12:22:29 2019 +0100
Process history, green/red process status, better error handling for TX,
trap invalid signatures in BLSVerify
---
.gitignore | 1 +
cmd/service/main.go | 16 +++---
go.mod | 3 +-
go.sum | 2 +
libs/documents/crypto.go | 6 ++-
pkg/api/proto.go | 2 +
pkg/tendermint/connector.go | 120 +++++++++++++++++++++++++++++++++++++++++---
7 files changed, 135 insertions(+), 15 deletions(-)
diff --git a/.gitignore b/.gitignore
index 9d69337..889a299 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,3 +2,4 @@ target/
vendor/
libs/crypto/libpqnist/build/
/cmd/service/__debug_bin
+/cmd/servicetester/ref
diff --git a/cmd/service/main.go b/cmd/service/main.go
index bb987f8..88a101d 100644
--- a/cmd/service/main.go
+++ b/cmd/service/main.go
@@ -219,19 +219,23 @@ func startDaemon(args []string) error {
case "none":
return nil
case "dump":
- svcPlugin.Dump(tx)
+ return nil
+ // return
svcPlugin.Dump(tx)
case "v1/fulfill/order":
- svcPlugin.FulfillOrder(tx)
+ _, err := svcPlugin.FulfillOrder(tx)
+ return err
case "v1/order2":
- svcPlugin.Order2(tx)
+ _, err := svcPlugin.Order2(tx)
+ return err
case "v1/fulfill/order/secret":
- svcPlugin.FulfillOrderSecret(tx)
+ _, err := svcPlugin.FulfillOrderSecret(tx)
+ return err
case "v1/order/secret2":
- svcPlugin.OrderSecret2(tx)
+ _, err := svcPlugin.OrderSecret2(tx)
+ return err
default:
return errors.New("Unknown processor")
}
- return nil
}
logger.Info("Starting Blockchain listener to node: %v",
cfg.Blockchain.BroadcastNode)
diff --git a/go.mod b/go.mod
index dd70335..7e241de 100644
--- a/go.mod
+++ b/go.mod
@@ -5,7 +5,7 @@ require (
github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d
github.com/coreos/go-oidc v2.0.0+incompatible
- github.com/fatih/color v1.7.0 // indirect
+ github.com/fatih/color v1.7.0
github.com/go-kit/kit v0.9.0
github.com/go-playground/locales v0.12.1 // indirect
github.com/go-playground/universal-translator v0.16.0 // indirect
@@ -14,6 +14,7 @@ require (
github.com/gogo/protobuf v1.3.0
github.com/golang/protobuf v1.3.2
github.com/google/uuid v1.1.1
+ github.com/gookit/color v1.2.0
github.com/gorilla/mux v1.7.3
github.com/hokaccha/go-prettyjson v0.0.0-20190818114111-108c894c2c0e
github.com/leodido/go-urn v1.1.0 // indirect
diff --git a/go.sum b/go.sum
index 58723e7..583a3c3 100644
--- a/go.sum
+++ b/go.sum
@@ -83,6 +83,8 @@ github.com/google/go-cmp v0.2.0/go.mod
h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a
github.com/google/gofuzz v1.0.0/go.mod
h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/gookit/color v1.2.0 h1:lHA77Kuyi5JpBnA9ESvwkY+nanLjRZ0mHbWQXRYk2Lk=
+github.com/gookit/color v1.2.0/go.mod
h1:AhIE+pS6D4Ql0SQWbBeXPHw7gY0/sjHoA4s/n1KB7xg=
github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw=
github.com/gorilla/mux v1.7.3/go.mod
h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/websocket v1.4.0/go.mod
h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
diff --git a/libs/documents/crypto.go b/libs/documents/crypto.go
index dde1e29..f0ce2f4 100644
--- a/libs/documents/crypto.go
+++ b/libs/documents/crypto.go
@@ -136,11 +136,15 @@ func Verify(signedEnvelope SignedEnvelope, blsPK []byte)
error {
message := signedEnvelope.Message
signature := signedEnvelope.Signature
+ if len(signature) == 0 {
+ return errors.New("Invalid Signature")
+ }
+
rc := crypto.BLSVerify(message, blsPK, signature)
if rc == 0 {
return nil
}
- return errors.New("invalid signature")
+ return errors.New("Invalid signature")
}
// Appends padding.
diff --git a/pkg/api/proto.go b/pkg/api/proto.go
index 57209a8..54c3a7b 100644
--- a/pkg/api/proto.go
+++ b/pkg/api/proto.go
@@ -56,6 +56,8 @@ const (
//BlockChainTX - struct for on chain req/resp
type BlockChainTX struct {
Processor string
+ Height int64
+ Index uint32
SenderID string
RecipientID string
AdditionalRecipientIDs []string
diff --git a/pkg/tendermint/connector.go b/pkg/tendermint/connector.go
index bf2f7e7..4810266 100644
--- a/pkg/tendermint/connector.go
+++ b/pkg/tendermint/connector.go
@@ -15,6 +15,7 @@ import (
"github.com/apache/incubator-milagro-dta/libs/logger"
"github.com/apache/incubator-milagro-dta/pkg/api"
status "github.com/apache/incubator-milagro-dta/pkg/tendermint/status"
+ "github.com/fatih/color"
"github.com/pkg/errors"
tmclient "github.com/tendermint/tendermint/rpc/client"
tmtypes "github.com/tendermint/tendermint/types"
@@ -155,15 +156,13 @@ func (nc *NodeConnector) Subscribe(ctx context.Context,
processFn ProcessTXFunc)
return errors.Wrap(err, "Failed to obtain latest blockheight of
Blockchain")
}
- var processedToHeight int
- if err := nc.store.Get("chain", "height", &processedToHeight); err !=
nil {
+ var processedTo string
+ if err := nc.store.Get("chain", "height", &processedTo); err != nil {
if err != datastore.ErrKeyNotFound {
return errors.Wrap(err, "Get last processed block
height")
}
}
- nc.log.Debug("Block height: Current: %v; Processed: %v",
currentBlockHeight, processedToHeight)
-
// create the transaction queue chan
txQueue := make(chan *api.BlockChainTX, txChanSize)
@@ -172,12 +171,100 @@ func (nc *NodeConnector) Subscribe(ctx context.Context,
processFn ProcessTXFunc)
return err
}
+ nc.loadMissingHistory(currentBlockHeight, processedTo, processFn)
// TODO: load historicTX
// Process events
return nc.processTXQueue(ctx, txQueue, processFn)
}
+func decodeProcessedTo(processedTo string) (processedToHeight int64,
processedToIndex uint32, err error) {
+ pth := strings.Split(processedTo, ".")
+
+ if len(pth) == 2 {
+ processedToHeight, err = strconv.ParseInt(pth[0], 10, 64)
+ if err != nil {
+ return 0, 0, errors.Wrapf(err, "Can't decode processed
to Height %s", processedTo)
+ }
+ procindex64, err := strconv.ParseUint(pth[1], 10, 32)
+ if err != nil {
+ return 0, 0, errors.Wrapf(err, "Can't decode processed
to Index %s", processedTo)
+ }
+ processedToIndex = uint32(procindex64)
+ return processedToHeight, processedToIndex, nil
+ }
+ return 0, 0, nil
+
+}
+
+func (nc *NodeConnector) loadMissingHistory(currentBlockHeight int,
processedTo string, processFn ProcessTXFunc) error {
+ nc.log.Debug("Block height: Current: %v; Processed: %s",
currentBlockHeight, processedTo)
+ processedToHeight, processedToIndex, err :=
decodeProcessedTo(processedTo)
+ if err != nil {
+ return err
+ }
+
+ //Open a 2nd websocket client
+ tmNodeAddr := strings.TrimRight(nc.tmNodeAddr, "/")
+ tmHistoryClient := tmclient.NewHTTP(fmt.Sprintf("tcp://%s",
tmNodeAddr), "/websocket")
+ if err := tmHistoryClient.Start(); err != nil {
+ return errors.Wrap(err, "Start tendermint history client")
+ }
+
+ currentPage := 1
+ query := fmt.Sprintf("tag.recipient='%v' AND tag.sender='%v' AND
tx.height>=%d AND tx.height<=%d", nc.nodeID, nc.nodeID, processedToHeight,
currentBlockHeight)
+ numPerPage := 5
+
+ for {
+ result, err := tmHistoryClient.TxSearch(query, true,
currentPage, numPerPage)
+ if err != nil {
+ return errors.Wrapf(err, "Failed to subscribe to query
%s", query)
+ }
+
+ for _, chainTx := range result.Txs {
+
+ tx := chainTx.Tx
+
+ payload := &api.BlockChainTX{}
+ err := json.Unmarshal(tx, payload)
+ if err != nil {
+ nc.log.Debug("IGNORED TX - Invalid!")
+ break
+ }
+ payload.Index = chainTx.Index
+ payload.Height = chainTx.Height
+
+ //processedTo check
+ if payload.Height < processedToHeight {
+ continue
+ }
+ if payload.Height == processedToHeight && payload.Index
<= processedToIndex {
+ continue
+ }
+
+ //Dont queue just process directly
+
+ if err := processFn(payload); err != nil {
+ msg := fmt.Sprintf("HISTORY %s Block:%v
Index:%v Error:%v", color.RedString("FAILURE"), chainTx.Height, chainTx.Index,
err)
+ nc.log.Info(msg)
+ } else {
+ msg := fmt.Sprintf("HISTORY %s Block:%v
Index:%v", color.GreenString("PROCESSED"), chainTx.Height, chainTx.Index)
+ nc.log.Info(msg)
+ }
+
+ if err := nc.updateProcessedUpToHeight(chainTx.Height,
chainTx.Index); err != nil {
+ return err
+ }
+
+ }
+ if currentPage*numPerPage > result.TotalCount {
+ break
+ }
+ currentPage++
+ }
+ return nil
+}
+
func (nc *NodeConnector) subscribeAndQueue(ctx context.Context, txQueue chan
*api.BlockChainTX) error {
query := "tag.recipient='" + nc.nodeID + "'"
@@ -194,6 +281,9 @@ func (nc *NodeConnector) subscribeAndQueue(ctx
context.Context, txQueue chan *ap
payload := &api.BlockChainTX{}
err := json.Unmarshal(tx, payload)
+ payload.Height =
result.Data.(tmtypes.EventDataTx).Height
+ payload.Index =
result.Data.(tmtypes.EventDataTx).Index
+
if err != nil {
nc.log.Debug("IGNORED TX - Invalid!")
break
@@ -221,11 +311,19 @@ func (nc *NodeConnector) subscribeAndQueue(ctx
context.Context, txQueue chan *ap
func (nc *NodeConnector) processTXQueue(ctx context.Context, txQueue chan
*api.BlockChainTX, processFn ProcessTXFunc) error {
for {
select {
- case tx := <-txQueue:
- if err := processFn(tx); err != nil {
- // TODO: errors block processing the queue
+ case chainTx := <-txQueue:
+
+ if err := processFn(chainTx); err != nil {
+ msg := fmt.Sprintf("TX %s Block:%v Index:%v
Error:%v", color.RedString("FAILURE"), chainTx.Height, chainTx.Index, err)
+ nc.log.Info(msg)
+ } else {
+ msg := fmt.Sprintf("TX %s Block:%v Index:%v",
color.GreenString("PROCESSED"), chainTx.Height, chainTx.Index)
+ nc.log.Info(msg)
+ }
+ if err := nc.updateProcessedUpToHeight(chainTx.Height,
chainTx.Index); err != nil {
return err
}
+
// TODO: store the last block height
case <-ctx.Done():
return nil
@@ -233,6 +331,14 @@ func (nc *NodeConnector) processTXQueue(ctx
context.Context, txQueue chan *api.B
}
}
+func (nc *NodeConnector) updateProcessedUpToHeight(height int64, index uint32)
error {
+ processedTo := fmt.Sprintf("%v.%v", height, index)
+ if err := nc.store.Set("chain", "height", &processedTo, nil); err !=
nil {
+ return errors.Wrapf(err, "Failed to update processed up to %s
", processedTo)
+ }
+ return nil
+}
+
func (nc *NodeConnector) getChainStatus() (*status.StatusResponse, error) {
url := fmt.Sprintf("http://%s/status", nc.tmNodeAddr)
resp, err := nc.httpClient.Get(url)