This is an automated email from the ASF dual-hosted git repository. smihaylov pushed a commit to branch tendermint in repository https://gitbox.apache.org/repos/asf/incubator-milagro-dta.git
commit 4f81f8ccf9e65e10b1d31570f5748d29032039fa Author: Stanislav Mihaylov <[email protected]> AuthorDate: Mon Oct 7 15:38:09 2019 +0300 Refactor Tendermint package --- cmd/service/commands.go | 1 + cmd/service/main.go | 63 ++++++++-- cmd/servicetester/tendertest1.sh | 4 +- go.mod | 2 - go.sum | 5 +- libs/documents/docs.go | 2 +- pkg/api/proto.go | 10 ++ pkg/common/chain.go | 14 +-- pkg/common/common.go | 5 +- pkg/defaultservice/fulfillTX.go | 34 ++++-- pkg/defaultservice/init.go | 14 ++- pkg/defaultservice/order.go | 35 ++++-- pkg/defaultservice/orderTX.go | 45 +++++-- pkg/defaultservice/service.go | 13 +- pkg/tendermint/cmd/cmd | Bin 10230244 -> 0 bytes pkg/tendermint/config.go | 6 - pkg/tendermint/connector.go | 245 ++++++++++++++++++++++++++++++++++++++ pkg/tendermint/tendermint.go | 165 ------------------------- pkg/tendermint/tendermint_test.go | 60 ---------- pkg/tendermint/websockets.go | 204 ------------------------------- 20 files changed, 424 insertions(+), 503 deletions(-) diff --git a/cmd/service/commands.go b/cmd/service/commands.go index d3d16bd..20a9300 100644 --- a/cmd/service/commands.go +++ b/cmd/service/commands.go @@ -78,6 +78,7 @@ func parseConfig(args []string) (*config.Config, error) { fs := flag.NewFlagSet("daemon", flag.ExitOnError) fs.StringVar(&(cfg.Plugins.Service), "service", cfg.Plugins.Service, "Service plugin") + fs.StringVar(&(cfg.Log.Level), "log-level", cfg.Log.Level, "Log level") if err := fs.Parse(args); err != nil { return nil, err diff --git a/cmd/service/main.go b/cmd/service/main.go index 41bb4a3..9c77efe 100644 --- a/cmd/service/main.go +++ b/cmd/service/main.go @@ -21,6 +21,7 @@ Package main - handles config, initialisation and starts the service daemon package main import ( + "context" "crypto/rand" "fmt" "net/http" @@ -36,6 +37,7 @@ import ( "github.com/apache/incubator-milagro-dta/libs/ipfs" "github.com/apache/incubator-milagro-dta/libs/logger" "github.com/apache/incubator-milagro-dta/libs/transport" + "github.com/apache/incubator-milagro-dta/pkg/api" "github.com/apache/incubator-milagro-dta/pkg/config" "github.com/apache/incubator-milagro-dta/pkg/defaultservice" "github.com/apache/incubator-milagro-dta/pkg/endpoints" @@ -138,11 +140,12 @@ func startDaemon(args []string) error { cfg.Log.Format, cfg.Log.Level, ) - if err != nil { return errors.Wrap(err, "init logger") } + logger.Debug("Logger in DEBUG mode!") + // Create KV store logger.Info("Datastore type: %s", cfg.Node.Datastore) store, err := initDataStore(cfg.Node.Datastore) @@ -187,6 +190,12 @@ func startDaemon(args []string) error { } } + // Init Tendermint node connector + tmConnector, err := tendermint.NewNodeConnector(cfg.Blockchain.BroadcastNode, cfg.Node.NodeID, store, logger) + if err != nil { + return errors.Wrap(err, "Blockchain Node connector") + } + //The Server must have a valid ID before starting up svcPlugin := plugins.FindServicePlugin(cfg.Plugins.Service) if svcPlugin == nil { @@ -200,7 +209,7 @@ func startDaemon(args []string) error { defaultservice.WithDataStore(store), defaultservice.WithKeyStore(keyStore), defaultservice.WithIPFS(ipfsConnector), - defaultservice.WithMasterFiduciary(masterFiduciaryServer), + defaultservice.WithTendermint(tmConnector), defaultservice.WithConfig(cfg), ); err != nil { return errors.Errorf("init service plugin %s", cfg.Plugins.Service) @@ -223,23 +232,47 @@ func startDaemon(args []string) error { }, []string{"method", "success"}) // Stop chan + ctx, cancelContext := context.WithCancel(context.Background()) + errChan := make(chan error) - logger.Info("NODE ID (IPFS): %v", svcPlugin.NodeID()) logger.Info("Node Type: %v", strings.ToLower(cfg.Node.NodeType)) - endpoints := endpoints.Endpoints(svcPlugin, cfg.HTTP.CorsAllow, authorizer, logger, cfg.Node.NodeType, svcPlugin) - httpHandler := transport.NewHTTPHandler(endpoints, logger, duration) + logger.Info("Node ID: %v", svcPlugin.NodeID()) + logger.Info("Master Fiduciary: %v", svcPlugin.MasterFiduciaryNodeID()) //Connect to Blockchain - Tendermint - go tendermint.Subscribe(svcPlugin, store, logger, cfg.Node.NodeID, cfg.HTTP.ListenAddr) - if err != nil { - return errors.Wrap(err, "init Tendermint Blockchain") - } + go func() { + processFn := func(tx *api.BlockChainTX) error { + switch tx.Processor { + case "none": + return nil + case "dump": + svcPlugin.Dump(tx) + case "v1/fulfill/order": + svcPlugin.FulfillOrder(tx) + case "v1/order2": + svcPlugin.Order2(tx) + case "v1/fulfill/order/secret": + svcPlugin.FulfillOrderSecret(tx) + case "v1/order/secret2": + svcPlugin.OrderSecret2(tx) + default: + return errors.New("Unknown processor") + } + return nil + } + + logger.Info("Starting Blockchain listener to node: %v", cfg.Blockchain.BroadcastNode) + errChan <- tmConnector.Subscribe(ctx, processFn) + // errChan <- tendermint.Subscribe(svcPlugin, store, logger, cfg.Node.NodeID, cfg.HTTP.ListenAddr) + }() // Start the application http server go func() { - logger.Info("starting listener on %v, custody server %v", cfg.HTTP.ListenAddr, cfg.Node.MasterFiduciaryServer) - // httpHandler.PathPrefix("/api/").Handler(http.St:ripPrefix("/api/", http.FileServer(http.Dir("./swagger")))) + httpEndpoints := endpoints.Endpoints(svcPlugin, cfg.HTTP.CorsAllow, authorizer, logger, cfg.Node.NodeType, svcPlugin) + httpHandler := transport.NewHTTPHandler(httpEndpoints, logger, duration) + + logger.Info("Starting HTTP listener on %v", cfg.HTTP.ListenAddr) errChan <- http.ListenAndServe(cfg.HTTP.ListenAddr, httpHandler) }() @@ -247,7 +280,7 @@ func startDaemon(args []string) error { http.DefaultServeMux.Handle("/metrics", promhttp.Handler()) // Start the debug and metrics http server go func() { - logger.Info("starting metrics listener on %v", cfg.HTTP.MetricsAddr) + logger.Info("Starting metrics listener on %v", cfg.HTTP.MetricsAddr) errChan <- http.ListenAndServe(cfg.HTTP.MetricsAddr, http.DefaultServeMux) }() } @@ -260,7 +293,11 @@ func startDaemon(args []string) error { }() stopErr := <-errChan - _ = logger.Log("exit", stopErr.Error()) + if stopErr != nil { + _ = logger.Log("exit", stopErr.Error()) + } + + cancelContext() return store.Close() } diff --git a/cmd/servicetester/tendertest1.sh b/cmd/servicetester/tendertest1.sh index 64fc139..7f91702 100755 --- a/cmd/servicetester/tendertest1.sh +++ b/cmd/servicetester/tendertest1.sh @@ -1,8 +1,8 @@ -ref=$(curl -s -X POST "127.0.0.1:5556/v1/order1" -H "accept: */*" -H "Content-Type: application/json" -d "{\"beneficiaryIDDocumentCID\":\"\",\"extension\":{\"coin\":\"0\"}}") +ref=$(curl -s -X POST "127.0.0.1:5556/v1/order1" -H "accept: */*" -H "Content-Type: application/json" -d "{\"beneficiaryIDDocumentCID\":\"QmecffRZTSJDETCRLcjcPe7ynmYNyYpnh7WKzKTdmX1GBZ\",\"extension\":{\"coin\":\"0\"}}") #sleep long enough for blockchain to catch up sleep 4 -curl -X POST "127.0.0.1:5556/v1/order/secret1" -H "accept: */*" -H "Content-Type: application/json" -d "{\"orderReference\":$ref,\"beneficiaryIDDocumentCID\":\"QmcyJqEMqNEEYHrNSyUY83CQCNwZ5yVan3SgaQ4NchsqsC\"}" +curl -X POST "127.0.0.1:5556/v1/order/secret1" -H "accept: */*" -H "Content-Type: application/json" -d "{\"orderReference\":$ref,\"beneficiaryIDDocumentCID\":\"QmecffRZTSJDETCRLcjcPe7ynmYNyYpnh7WKzKTdmX1GBZ\"}" diff --git a/go.mod b/go.mod index 7e0150c..860eb9c 100644 --- a/go.mod +++ b/go.mod @@ -2,8 +2,6 @@ module github.com/apache/incubator-milagro-dta require ( github.com/TylerBrock/colorjson v0.0.0-20180527164720-95ec53f28296 - github.com/btcsuite/btcd v0.0.0-20190427004231-96897255fd17 - github.com/VividCortex/gohistogram v1.0.0 // indirect github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3 github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d github.com/coreos/go-oidc v2.0.0+incompatible diff --git a/go.sum b/go.sum index 1ee5995..fd46e0f 100644 --- a/go.sum +++ b/go.sum @@ -16,8 +16,8 @@ github.com/TylerBrock/colorjson v0.0.0-20180527164720-95ec53f28296 h1:JYWTroLXcN github.com/TylerBrock/colorjson v0.0.0-20180527164720-95ec53f28296/go.mod h1:VSw57q4QFiWDbRnjdX8Cb3Ow0SFncRw+bA/ofY6Q83w= github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE= github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= -github.com/Workiva/go-datastructures v1.0.50/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA= github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= +github.com/Workiva/go-datastructures v1.0.50/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA= github.com/aead/siphash v1.0.1 h1:FwHfE/T45KPKYuuSAKyyvE+oPWcaQ+CUmFW0bPlM+kg= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -105,9 +105,9 @@ github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8 github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-check/check v0.0.0-20180628173108-788fd7840127 h1:0gkP6mzaMqkmpcJYCFOLkIBwI7xFExG03bbkOkCvUPI= -github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98= github.com/go-critic/go-critic v0.3.5-0.20190526074819-1df300866540/go.mod h1:+sE8vrLDS2M0pZkBk0wy6+nLdKexVDrl/jBqQOTDThA= github.com/go-kit/kit v0.8.0 h1:Wz+5lgoB0kkuqLEc6NVmwRknTKP6dTGbSqvhZtBI/j0= @@ -689,6 +689,7 @@ github.com/tendermint/go-amino v0.14.1 h1:o2WudxNfdLNBwMyl2dqOJxiro5rfrEaU0Ugs6o github.com/tendermint/go-amino v0.14.1/go.mod h1:i/UKE5Uocn+argJJBb12qTZsCDBcAYMbR92AaJVmKso= github.com/tendermint/tendermint v0.32.4 h1:KwZIMtT+ROvfMYO3wine6F9hak3SpngcRcAIzys1J3I= github.com/tendermint/tendermint v0.32.4/go.mod h1:D2+A3pNjY+Po72X0mTfaXorFhiVI8dh/Zg640FGyGtE= +github.com/tendermint/tendermint v0.32.5 h1:2hCLwuzfCKZxXSe/+iMEl+ChJWKJx6g/Wcvq3NMxVN4= github.com/tendermint/tm-db v0.2.0 h1:rJxgdqn6fIiVJZy4zLpY1qVlyD0TU6vhkT4kEf71TQQ= github.com/tendermint/tm-db v0.2.0/go.mod h1:0cPKWu2Mou3IlxecH+MEUSYc1Ch537alLe6CpFrKzgw= github.com/texttheater/golang-levenshtein v0.0.0-20180516184445-d188e65d659e/go.mod h1:XDKHRm5ThF8YJjx001LtgelzsoaEcvnA7lVWz9EeX3g= diff --git a/libs/documents/docs.go b/libs/documents/docs.go index 7e9fe27..59bad92 100644 --- a/libs/documents/docs.go +++ b/libs/documents/docs.go @@ -93,7 +93,7 @@ func DecodeIDDocument(rawdoc []byte, tag string, idDocument *IDDoc) error { return nil } -//PeekOrderDocument - look at the header inside an order document before decryption +//OrderDocumentSigner - look at the header inside an order document before decryption func OrderDocumentSigner(rawDoc []byte) (string, error) { signedEnvelope := SignedEnvelope{} err := proto.Unmarshal(rawDoc, &signedEnvelope) diff --git a/pkg/api/proto.go b/pkg/api/proto.go index aedcd2c..e3200d7 100644 --- a/pkg/api/proto.go +++ b/pkg/api/proto.go @@ -24,6 +24,8 @@ package api */ import ( + "crypto/sha256" + "encoding/hex" "time" ) @@ -51,6 +53,14 @@ type BlockChainTX struct { Tags map[string]string } +// CalcHash calculates, sets the TXhash and returns the string representation +func (tx *BlockChainTX) CalcHash() string { + txSha := sha256.Sum256(tx.Payload) + tx.TXhash = txSha[:] + return hex.EncodeToString(txSha[:]) + +} + //CreateIdentityRequest - type CreateIdentityRequest struct { Name string `json:"name,omitempty" validate:"required,alphanum"` diff --git a/pkg/common/chain.go b/pkg/common/chain.go index d7a7023..a0438b5 100644 --- a/pkg/common/chain.go +++ b/pkg/common/chain.go @@ -10,16 +10,8 @@ import ( ) // CreateTX creates the transaction ready for write to the chain -func CreateTX(nodeID string, store *datastore.Store, id string, order *documents.OrderDoc, recipients map[string]documents.IDDoc) ([]byte, []byte, error) { - secrets := &IdentitySecrets{} - if err := store.Get("id-doc", nodeID, secrets); err != nil { - return nil, nil, errors.New("load secrets from store") - } - blsSecretKey, err := hex.DecodeString(secrets.BLSSecretKey) - if err != nil { - return nil, nil, errors.Wrap(err, "Decode identity secrets") - } - rawDoc, err := documents.EncodeOrderDocument(nodeID, *order, blsSecretKey, "previousID", recipients) +func CreateTX(nodeID string, store *datastore.Store, blsSecretKey []byte, id string, order *documents.OrderDoc, recipients map[string]*documents.IDDoc) ([]byte, []byte, error) { + rawDoc, err := documents.EncodeOrderDocument(nodeID, *order, blsSecretKey, recipients) if err != nil { return nil, nil, errors.Wrap(err, "Failed to encode IDDocument") } @@ -32,7 +24,7 @@ func CreateTX(nodeID string, store *datastore.Store, id string, order *documents return TXID[:], rawDoc, nil } -//Decode a transaction for header data but don't decrypt it +//PeekTX Decode a transaction for header data but don't decrypt it func PeekTX(tx []byte) (string, error) { signerCID, err := documents.OrderDocumentSigner(tx) print(signerCID) diff --git a/pkg/common/common.go b/pkg/common/common.go index 4e8220b..cdca4fe 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -105,6 +105,7 @@ func RetrieveSeed(store *datastore.Store, reference string) (seedHex string, err return seedHex, nil } +//WriteOrderToStore stores an order func WriteOrderToStore(store *datastore.Store, orderReference string, address string) error { if err := store.Set("order", orderReference, address, map[string]string{"time": time.Now().UTC().Format(time.RFC3339)}); err != nil { return errors.New("Save Order to store") @@ -113,8 +114,8 @@ func WriteOrderToStore(store *datastore.Store, orderReference string, address st } // BuildRecipientList builds a list of recipients who are able to decrypt the encrypted envelope -func BuildRecipientList(ipfs ipfs.Connector, IDDocs ...string) (map[string]documents.IDDoc, error) { - recipients := make(map[string]documents.IDDoc) +func BuildRecipientList(ipfs ipfs.Connector, IDDocs ...string) (map[string]*documents.IDDoc, error) { + recipients := make(map[string]*documents.IDDoc) for _, v := range IDDocs { iddoc, err := RetrieveIDDocFromIPFS(ipfs, v) if err != nil { diff --git a/pkg/defaultservice/fulfillTX.go b/pkg/defaultservice/fulfillTX.go index b0c1025..b0527a9 100644 --- a/pkg/defaultservice/fulfillTX.go +++ b/pkg/defaultservice/fulfillTX.go @@ -25,9 +25,10 @@ import ( "github.com/apache/incubator-milagro-dta/libs/documents" "github.com/apache/incubator-milagro-dta/pkg/api" "github.com/apache/incubator-milagro-dta/pkg/common" - "github.com/apache/incubator-milagro-dta/pkg/tendermint" + "github.com/apache/incubator-milagro-dta/pkg/identity" ) +// FulfillOrder TX func (s *Service) FulfillOrder(tx *api.BlockChainTX) (string, error) { nodeID := s.NodeID() reqPayload := tx.Payload @@ -40,7 +41,16 @@ func (s *Service) FulfillOrder(tx *api.BlockChainTX) (string, error) { } remoteIDDocCID := signerID - _, _, _, sikeSK, err := common.RetrieveIdentitySecrets(s.Store, nodeID) + // SIKE and BLS keys + keyseed, err := s.KeyStore.Get("seed") + if err != nil { + return "", err + } + _, sikeSK, err := identity.GenerateSIKEKeys(keyseed) + if err != nil { + return "", err + } + _, blsSK, err := identity.GenerateBLSKeys(keyseed) if err != nil { return "", err } @@ -83,7 +93,7 @@ func (s *Service) FulfillOrder(tx *api.BlockChainTX) (string, error) { } //Create a new Transaction payload and TX - txHash, payload, err := common.CreateTX(nodeID, s.Store, nodeID, order, recipientList) + txHash, payload, err := common.CreateTX(nodeID, s.Store, blsSK, nodeID, order, recipientList) //Write the requests to the chain chainTX := &api.BlockChainTX{ @@ -95,8 +105,8 @@ func (s *Service) FulfillOrder(tx *api.BlockChainTX) (string, error) { TXhash: txHash, Tags: map[string]string{"reference": order.Reference, "txhash": hex.EncodeToString(txHash)}, } - return tendermint.PostToChain(chainTX, "FulfillOrder") + return s.Tendermint.PostTx(chainTX, "FulfillOrder") } // FulfillOrderSecret - @@ -112,7 +122,16 @@ func (s *Service) FulfillOrderSecret(tx *api.BlockChainTX) (string, error) { } remoteIDDocCID := signerID - _, _, _, sikeSK, err := common.RetrieveIdentitySecrets(s.Store, nodeID) + // SIKE and BLS keys + keyseed, err := s.KeyStore.Get("seed") + if err != nil { + return "", err + } + _, sikeSK, err := identity.GenerateSIKEKeys(keyseed) + if err != nil { + return "", err + } + _, blsSK, err := identity.GenerateBLSKeys(keyseed) if err != nil { return "", err } @@ -155,7 +174,7 @@ func (s *Service) FulfillOrderSecret(tx *api.BlockChainTX) (string, error) { } //Create a new Transaction payload and TX - txHash, payload, err := common.CreateTX(nodeID, s.Store, nodeID, order, recipientList) + txHash, payload, err := common.CreateTX(nodeID, s.Store, blsSK, nodeID, order, recipientList) //Write the requests to the chain chainTX := &api.BlockChainTX{ @@ -167,5 +186,6 @@ func (s *Service) FulfillOrderSecret(tx *api.BlockChainTX) (string, error) { Payload: payload, Tags: map[string]string{"reference": order.Reference, "txhash": hex.EncodeToString(txHash)}, } - return tendermint.PostToChain(chainTX, "FulfillOrderSecret") + + return s.Tendermint.PostTx(chainTX, "FulfillOrderSecret") } diff --git a/pkg/defaultservice/init.go b/pkg/defaultservice/init.go index f0c0190..e4874ae 100644 --- a/pkg/defaultservice/init.go +++ b/pkg/defaultservice/init.go @@ -24,8 +24,8 @@ import ( "github.com/apache/incubator-milagro-dta/libs/ipfs" "github.com/apache/incubator-milagro-dta/libs/keystore" "github.com/apache/incubator-milagro-dta/libs/logger" - "github.com/apache/incubator-milagro-dta/pkg/api" "github.com/apache/incubator-milagro-dta/pkg/config" + "github.com/apache/incubator-milagro-dta/pkg/tendermint" ) // ServiceOption function to set Service properties @@ -84,10 +84,18 @@ func WithIPFS(ipfsConnector ipfs.Connector) ServiceOption { } } +// WithTendermint adds tendermint node connector to the Service +func WithTendermint(tmConnector *tendermint.NodeConnector) ServiceOption { + return func(s *Service) error { + s.Tendermint = tmConnector + return nil + } +} + // WithMasterFiduciary adds master fiduciary connector to the Service -func WithMasterFiduciary(masterFiduciaryServer api.ClientService) ServiceOption { +func WithMasterFiduciary(masterFiduciaryNodeID string) ServiceOption { return func(s *Service) error { - s.MasterFiduciaryServer = masterFiduciaryServer + s.SetMasterFiduciaryNodeID(masterFiduciaryNodeID) return nil } } diff --git a/pkg/defaultservice/order.go b/pkg/defaultservice/order.go index acc6a6d..3f60158 100644 --- a/pkg/defaultservice/order.go +++ b/pkg/defaultservice/order.go @@ -27,7 +27,6 @@ import ( "github.com/apache/incubator-milagro-dta/pkg/api" "github.com/apache/incubator-milagro-dta/pkg/common" "github.com/apache/incubator-milagro-dta/pkg/identity" - "github.com/apache/incubator-milagro-dta/pkg/tendermint" "github.com/pkg/errors" ) @@ -111,8 +110,6 @@ func (s *Service) PrepareOrderPart1(order *documents.OrderDoc, reqExtension map[ } // PrepareOrderResponse gets the updated order and returns the commitment and extension -//func (s *Service) PrepareOrderResponse(orderPart2 *documents.OrderDoc, reqExtension, fulfillExtension map[string]string) (commitment string, extension map[string]string, err error) { - func (s *Service) PrepareOrderResponse(orderPart2 *documents.OrderDoc) (commitment string, extension map[string]string, err error) { return orderPart2.OrderPart2.CommitmentPublicKey, nil, nil } @@ -158,8 +155,18 @@ func (s *Service) Order1(req *api.OrderRequest) (string, error) { order.OrderReqExtension[key] = value } + // BLS key + keyseed, err := s.KeyStore.Get("seed") + if err != nil { + return "", err + } + _, blsSK, err := identity.GenerateBLSKeys(keyseed) + if err != nil { + return "", err + } + //This is serialized and output to the chain - txHash, payload, err := common.CreateTX(nodeID, s.Store, nodeID, order, recipientList) + txHash, payload, err := common.CreateTX(nodeID, s.Store, blsSK, nodeID, order, recipientList) //Write the requests to the chain chainTX := &api.BlockChainTX{ @@ -171,7 +178,11 @@ func (s *Service) Order1(req *api.OrderRequest) (string, error) { TXhash: txHash, Tags: map[string]string{"reference": order.Reference, "txhash": hex.EncodeToString(txHash)}, } - tendermint.PostToChain(chainTX, "Order1") + + if _, err := s.Tendermint.PostTx(chainTX, "Order1"); err != nil { + return "", err + } + return order.Reference, nil } @@ -196,11 +207,11 @@ func (s *Service) OrderSecret1(req *api.OrderSecretRequest) (string, error) { // SIKE and BLS keys keyseed, err := s.KeyStore.Get("seed") if err != nil { - return nil, err + return "", err } _, sikeSK, err := identity.GenerateSIKEKeys(keyseed) if err != nil { - return nil, err + return "", err } _, blsSK, err := identity.GenerateBLSKeys(keyseed) if err != nil { @@ -212,7 +223,7 @@ func (s *Service) OrderSecret1(req *api.OrderSecretRequest) (string, error) { return "", err } - tx, err := tendermint.TXbyHash(previousOrderHash) + tx, err := s.Tendermint.GetTx(previousOrderHash) if err != nil { return "", err } @@ -258,7 +269,7 @@ func (s *Service) OrderSecret1(req *api.OrderSecretRequest) (string, error) { Timestamp: time.Now().Unix(), } - txHash, payload, err := common.CreateTX(nodeID, s.Store, nodeID, order, recipientList) + txHash, payload, err := common.CreateTX(nodeID, s.Store, blsSK, nodeID, order, recipientList) //Write the requests to the chain chainTX := &api.BlockChainTX{ @@ -269,6 +280,10 @@ func (s *Service) OrderSecret1(req *api.OrderSecretRequest) (string, error) { Payload: payload, Tags: map[string]string{"reference": order.Reference, "txhash": hex.EncodeToString(txHash)}, } - tendermint.PostToChain(chainTX, "OrderSecret1") + + if _, err := s.Tendermint.PostTx(chainTX, "OrderSecret1"); err != nil { + return "", err + } + return order.Reference, nil } diff --git a/pkg/defaultservice/orderTX.go b/pkg/defaultservice/orderTX.go index 4fe62f4..d6188c9 100644 --- a/pkg/defaultservice/orderTX.go +++ b/pkg/defaultservice/orderTX.go @@ -19,11 +19,12 @@ package defaultservice import ( "encoding/hex" + "fmt" "github.com/apache/incubator-milagro-dta/libs/documents" "github.com/apache/incubator-milagro-dta/pkg/api" "github.com/apache/incubator-milagro-dta/pkg/common" - "github.com/apache/incubator-milagro-dta/pkg/tendermint" + "github.com/apache/incubator-milagro-dta/pkg/identity" "github.com/pkg/errors" ) @@ -38,7 +39,16 @@ func (s *Service) Order2(tx *api.BlockChainTX) (string, error) { return "", err } - _, _, _, sikeSK, err := common.RetrieveIdentitySecrets(s.Store, nodeID) + // SIKE and BLS keys + keyseed, err := s.KeyStore.Get("seed") + if err != nil { + return "", err + } + _, sikeSK, err := identity.GenerateSIKEKeys(keyseed) + if err != nil { + return "", err + } + _, blsSK, err := identity.GenerateBLSKeys(keyseed) if err != nil { return "", err } @@ -73,7 +83,7 @@ func (s *Service) Order2(tx *api.BlockChainTX) (string, error) { } //Generate a transaction - txHash, payload, err := common.CreateTX(nodeID, s.Store, nodeID, order, recipientList) + txHash, payload, err := common.CreateTX(nodeID, s.Store, blsSK, nodeID, order, recipientList) //Write the Order2 results to the chain chainTX := &api.BlockChainTX{ @@ -84,8 +94,8 @@ func (s *Service) Order2(tx *api.BlockChainTX) (string, error) { Payload: payload, Tags: map[string]string{"reference": order.Reference, "txhash": hex.EncodeToString(txHash)}, } - return tendermint.PostToChain(chainTX, "Order2") + return s.Tendermint.PostTx(chainTX, "Order2") } // OrderSecret2 - Process an incoming Blockchain Order/Secret transaction from a MasterFiduciary, to generate the final secret @@ -94,7 +104,16 @@ func (s *Service) OrderSecret2(tx *api.BlockChainTX) (string, error) { reqPayload := tx.Payload txHashString := hex.EncodeToString(tx.TXhash) - _, _, _, sikeSK, err := common.RetrieveIdentitySecrets(s.Store, nodeID) + // SIKE and BLS keys + keyseed, err := s.KeyStore.Get("seed") + if err != nil { + return "", err + } + _, sikeSK, err := identity.GenerateSIKEKeys(keyseed) + if err != nil { + return "", err + } + _, blsSK, err := identity.GenerateBLSKeys(keyseed) if err != nil { return "", err } @@ -107,17 +126,16 @@ func (s *Service) OrderSecret2(tx *api.BlockChainTX) (string, error) { //Decode the Order from the supplied TX order := &documents.OrderDoc{} err = documents.DecodeOrderDocument(reqPayload, txHashString, order, sikeSK, nodeID, remoteIDDoc.BLSPublicKey) + if err != nil { + fmt.Println("ERROR DEcode Order:", err) + return "", err + } if order.BeneficiaryCID != nodeID { return "", errors.New("Invalid Processor") } - _, seed, _, sikeSK, err := common.RetrieveIdentitySecrets(s.Store, nodeID) - if err != nil { - return "", err - } - - finalPrivateKey, _, extension, err := s.Plugin.ProduceFinalSecret(seed, sikeSK, order, order, nodeID) + finalPrivateKey, _, extension, err := s.Plugin.ProduceFinalSecret(keyseed, sikeSK, order, order, nodeID) if err != nil { return "", err } @@ -140,7 +158,7 @@ func (s *Service) OrderSecret2(tx *api.BlockChainTX) (string, error) { if err != nil { return "", err } - txHash, payload, err := common.CreateTX(nodeID, s.Store, nodeID, order, recipientList) + txHash, payload, err := common.CreateTX(nodeID, s.Store, blsSK, nodeID, order, recipientList) //Write the requests to the chain chainTX := &api.BlockChainTX{ @@ -151,5 +169,6 @@ func (s *Service) OrderSecret2(tx *api.BlockChainTX) (string, error) { Payload: payload, Tags: map[string]string{"reference": order.Reference, "txhash": hex.EncodeToString(txHash)}, } - return tendermint.PostToChain(chainTX, "OrderSecret2") + + return s.Tendermint.PostTx(chainTX, "OrderSecret2") } diff --git a/pkg/defaultservice/service.go b/pkg/defaultservice/service.go index 9299b92..2b9fe0d 100644 --- a/pkg/defaultservice/service.go +++ b/pkg/defaultservice/service.go @@ -34,7 +34,8 @@ import ( "github.com/apache/incubator-milagro-dta/libs/transport" "github.com/apache/incubator-milagro-dta/pkg/api" "github.com/apache/incubator-milagro-dta/pkg/common" - "github.com/apache/incubator-milagro-dta/pkg/config" + "github.com/apache/incubator-milagro-dta/pkg/identity" + "github.com/apache/incubator-milagro-dta/pkg/tendermint" "github.com/hokaccha/go-prettyjson" ) @@ -52,6 +53,7 @@ type Service struct { Store *datastore.Store KeyStore keystore.Store Ipfs ipfs.Connector + Tendermint *tendermint.NodeConnector nodeID string masterFiduciaryNodeID string } @@ -115,7 +117,12 @@ func (s *Service) Dump(tx *api.BlockChainTX) error { return err } - _, _, _, sikeSK, err := common.RetrieveIdentitySecrets(s.Store, nodeID) + // SIKE and BLS keys + keyseed, err := s.KeyStore.Get("seed") + if err != nil { + return err + } + _, sikeSK, err := identity.GenerateSIKEKeys(keyseed) if err != nil { return err } @@ -127,6 +134,8 @@ func (s *Service) Dump(tx *api.BlockChainTX) error { fmt.Println(string(pp)) return nil +} + // Endpoints for extending the plugin endpoints func (s *Service) Endpoints() (namespace string, endpoints transport.HTTPEndpoints) { return s.Name(), nil diff --git a/pkg/tendermint/cmd/cmd b/pkg/tendermint/cmd/cmd deleted file mode 100755 index 0e07c4f..0000000 Binary files a/pkg/tendermint/cmd/cmd and /dev/null differ diff --git a/pkg/tendermint/config.go b/pkg/tendermint/config.go deleted file mode 100644 index 510189d..0000000 --- a/pkg/tendermint/config.go +++ /dev/null @@ -1,6 +0,0 @@ -package tendermint - -const ( - //node = "127.0.0.1:26657" - node = "34.246.173.153:26657" -) diff --git a/pkg/tendermint/connector.go b/pkg/tendermint/connector.go new file mode 100644 index 0000000..0dab29e --- /dev/null +++ b/pkg/tendermint/connector.go @@ -0,0 +1,245 @@ +package tendermint + +import ( + "context" + "encoding/base64" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strconv" + "strings" + "time" + + "github.com/apache/incubator-milagro-dta/libs/datastore" + "github.com/apache/incubator-milagro-dta/libs/logger" + "github.com/apache/incubator-milagro-dta/pkg/api" + status "github.com/apache/incubator-milagro-dta/pkg/tendermint/status" + "github.com/pkg/errors" + tmclient "github.com/tendermint/tendermint/rpc/client" + tmtypes "github.com/tendermint/tendermint/types" +) + +const ( + nodeConnectionTimeout = time.Second * 10 + txChanSize = 1000 +) + +// ProcessTXFunc is executed on each incoming TX +type ProcessTXFunc func(tx *api.BlockChainTX) error + +// NodeConnector is using external tendermint node to post and get transactions +type NodeConnector struct { + nodeID string + tmNodeAddr string + httpClient *http.Client + tmClient *tmclient.HTTP + log *logger.Logger + store *datastore.Store +} + +// NewNodeConnector constructs a new Tendermint NodeConnector +func NewNodeConnector(tmNodeAddr string, nodeID string, store *datastore.Store, log *logger.Logger) (conn *NodeConnector, err error) { + defer func() { + if r := recover(); r != nil { + err = errors.Errorf("Initialize tendermint node connector: %v", r) + } + }() + + tmNodeAddr = strings.TrimRight(tmNodeAddr, "/") + tmClient := tmclient.NewHTTP(fmt.Sprintf("tcp://%s", tmNodeAddr), "/websocket") + if err := tmClient.Start(); err != nil { + return nil, errors.Wrap(err, "Start tendermint client") + } + + return &NodeConnector{ + tmNodeAddr: tmNodeAddr, + nodeID: nodeID, + log: log, + store: store, + httpClient: &http.Client{ + Timeout: nodeConnectionTimeout, + }, + tmClient: tmClient, + }, nil + +} + +// Stop is performing clean-up +func (nc *NodeConnector) Stop() error { + return nc.tmClient.Stop() +} + +// GetTx retreives a transaction by hash +func (nc *NodeConnector) GetTx(txHash string) (*api.BlockChainTX, error) { + query := fmt.Sprintf("tag.txhash='%s'", txHash) + result, err := nc.tmClient.TxSearch(query, true, 1, 1) + if err != nil { + return nil, err + } + if len(result.Txs) == 0 { + return nil, errors.New("Transaction not found") + } + + payload := &api.BlockChainTX{} + if err := json.Unmarshal(result.Txs[0].Tx, &payload); err != nil { + return nil, err + } + + return payload, nil +} + +// PostTx posts a transaction to the chain and returns the transaction ID +func (nc *NodeConnector) PostTx(tx *api.BlockChainTX, method string) (txID string, err error) { + txID = tx.CalcHash() + + //serialize the whole transaction + serializedTX, err := json.Marshal(tx) + if err != nil { + return + } + base64EncodedTX := base64.StdEncoding.EncodeToString(serializedTX) + + // TODO: use net/rpc + body := strings.NewReader(`{ + "jsonrpc": "2.0", + "id": "anything", + "method": "broadcast_tx_commit", + "params": { + "tx": "` + base64EncodedTX + `"} + }`) + url := "http://" + nc.tmNodeAddr + + req, err := http.NewRequest("POST", url, body) + if err != nil { + return "", errors.Wrap(err, "post to blockchain node") + } + req.Header.Set("Content-Type", "text/plain;") + + resp, err := nc.httpClient.Do(req) + if err != nil { + return "", errors.Wrap(err, "post to blockchain node") + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + var respErr string + if b, err := ioutil.ReadAll(resp.Body); err != nil { + respErr = resp.Status + } else { + respErr = string(b) + } + + return "", errors.Errorf("Post to blockchain node status %v: %v", resp.StatusCode, respErr) + } + + nc.log.Debug("POST TO CHAIN: METHOD: %s CALLS: %s - TXID: %s", method, tx.Processor, txID) + + return +} + +// Subscribe connects to the Tendermint node and collect the events +func (nc *NodeConnector) Subscribe(ctx context.Context, processFn ProcessTXFunc) error { + chainStatus, err := nc.getChainStatus() + if err != nil { + return err + } + + currentBlockHeight, err := strconv.Atoi(chainStatus.Result.SyncInfo.LatestBlockHeight) + if err != nil { + return errors.Wrap(err, "Failed to obtain latest blockheight of Blockchain") + } + + var processedToHeight int + if err := nc.store.Get("chain", "height", &processedToHeight); err != nil { + if err != datastore.ErrKeyNotFound { + return errors.Wrap(err, "Get last processed block height") + } + } + + nc.log.Debug("Block height: Current: %v; Processed: %v", currentBlockHeight, processedToHeight) + + // create the transaction queue chan + txQueue := make(chan *api.BlockChainTX, txChanSize) + + // Collect events + if err := nc.subscribeAndQueue(ctx, txQueue); err != nil { + return err + } + + // TODO: load historicTX + + // Process events + return nc.processTXQueue(ctx, txQueue, processFn) +} + +func (nc *NodeConnector) subscribeAndQueue(ctx context.Context, txQueue chan *api.BlockChainTX) error { + query := "tag.recipient='" + nc.nodeID + "'" + + out, err := nc.tmClient.Subscribe(context.Background(), "test", query, 1000) + if err != nil { + return errors.Wrapf(err, "Failed to subscribe to query %s", query) + } + + go func() { + for { + select { + case result := <-out: + tx := result.Data.(tmtypes.EventDataTx).Tx + payload := &api.BlockChainTX{} + err := json.Unmarshal(tx, payload) + if err != nil { + nc.log.Debug("IGNORED TX - Invalid!") + break + } + + //check if this node is in receipient list + if payload.RecipientID != nc.nodeID { + nc.log.Debug("IGNORED TX! Recipient not match the query! (%v != %v)", payload.RecipientID, nc.nodeID) + break + } + + //Add into the waitingQueue for later processing + txQueue <- payload + case <-ctx.Done(): + return + } + } + }() + + return nil +} + +func (nc *NodeConnector) processTXQueue(ctx context.Context, txQueue chan *api.BlockChainTX, processFn ProcessTXFunc) error { + for { + select { + case tx := <-txQueue: + if err := processFn(tx); err != nil { + // TODO: errors block processing the queue + return err + } + // TODO: store the last block height + case <-ctx.Done(): + return nil + } + } +} + +func (nc *NodeConnector) getChainStatus() (*status.StatusResponse, error) { + url := fmt.Sprintf("http://%s/status", nc.tmNodeAddr) + resp, err := nc.httpClient.Get(url) + if err != nil { + return nil, errors.Wrap(err, "Get node status") + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return nil, errors.Errorf("Get node status status code: %v", resp.StatusCode) + } + + status := &status.StatusResponse{} + if err := json.NewDecoder(resp.Body).Decode((&status)); err != nil { + return nil, errors.Wrap(err, "Invalid node status response") + } + + return status, nil +} diff --git a/pkg/tendermint/tendermint.go b/pkg/tendermint/tendermint.go deleted file mode 100644 index e02b3c1..0000000 --- a/pkg/tendermint/tendermint.go +++ /dev/null @@ -1,165 +0,0 @@ -package tendermint - -import ( - "bufio" - "crypto/sha256" - "encoding/base64" - "encoding/hex" - "encoding/json" - "errors" - "fmt" - "net/http" - "strings" - - "github.com/apache/incubator-milagro-dta/pkg/api" - "github.com/apache/incubator-milagro-dta/pkg/service" -) - -//QueryChain the blockchain for an index -func QueryChain(index string) (string, string) { - url := "http://" + node + "/abci_query?data=\"" + index + "\"" - resp, err := http.Get(url) - if err != nil { - // handle err - } - defer resp.Body.Close() - scanner := bufio.NewScanner(resp.Body) - scanner.Split(bufio.ScanBytes) - t := "" - for scanner.Scan() { - t += scanner.Text() - ///fmt.Print(scanner.Text()) - } - - res, _ := UnmarshalChainQuery([]byte(t)) - - val := res.Result.Response.Value - decodeVal, _ := base64.StdEncoding.DecodeString(val) - return string(decodeVal), val -} - -//PostToChain - send TX data to the Blockchain -func PostToChain(tx *api.BlockChainTX, method string) (string, error) { - //Create TX Hash - - tx.RecipientID = tx.RecipientID - - TXID := sha256.Sum256(tx.Payload) - TXIDhex := hex.EncodeToString(TXID[:]) - tx.TXhash = TXID[:] - - //serialize the whole transaction - serializedTX, _ := json.Marshal(tx) - base64EncodedTX := base64.StdEncoding.EncodeToString(serializedTX) - - body := strings.NewReader("{\"jsonrpc\":\"2.0\",\"id\":\"anything\",\"method\":\"broadcast_tx_commit\",\"params\": {\"tx\": \"" + base64EncodedTX + "\"}}") - url := "http://" + node + "" - - req, err := http.NewRequest("POST", url, body) - if err != nil { - print("Error posting to Blockchain") - return "", err - } - req.Header.Set("Content-Type", "text/plain;") - - resp, err := http.DefaultClient.Do(req) - if err != nil { - print("Error posting to Blockchain") - return "", err - } - defer resp.Body.Close() - fmt.Printf("POST TO CHAIN: METHOD:%s CALLS:%s - TXID:%s\n", method, tx.Processor, TXIDhex) - return TXIDhex, nil -} - -//HandleChainTX - -func HandleChainTX(myID string, tx string) error { - blockChainTX, err := decodeChainTX(tx) - if err != nil { - return err - } - panic(nil) - err = callNextTX(nil, blockChainTX, "5556") - if err != nil { - return err - } - return nil -} - -//DecodeChainTX - Decode the On Chain TX into a BlockChainTX object -func decodeChainTX(payload string) (*api.BlockChainTX, error) { - base64DecodedTX, _ := base64.StdEncoding.DecodeString(payload) - tx := &api.BlockChainTX{} - - err := json.Unmarshal(base64DecodedTX, tx) - if err != nil { - return &api.BlockChainTX{}, err - } - return tx, nil -} - -//DecodeChainTX - Decode the On Chain TX into a BlockChainTX object -func decodeTX(payload string) (*api.BlockChainTX, string, error) { - tx := &api.BlockChainTX{} - parts := strings.SplitN(payload, "=", 2) - if len(parts) != 2 { - return &api.BlockChainTX{}, "", errors.New("Invalid TX payload") - } - hash := string(parts[0]) - err := json.Unmarshal([]byte(parts[1]), tx) - if err != nil { - return &api.BlockChainTX{}, "", err - } - return tx, hash, nil -} - -func callNextTX(svc service.Service, tx *api.BlockChainTX, listenPort string) error { - switch tx.Processor { - case "none": - return nil - case "dump": - svc.Dump(tx) - case "v1/fulfill/order": - svc.FulfillOrder(tx) - case "v1/order2": - svc.Order2(tx) - case "v1/fulfill/order/secret": - svc.FulfillOrderSecret(tx) - case "v1/order/secret2": - svc.OrderSecret2(tx) - - default: - return errors.New("Unknown processor") - } - return nil -} - -//DumpTXID - -func DumpTXID(txid string) { - value, raw := QueryChain(txid) - println(value) - bc, _ := decodeChainTX(raw) - println(string(bc.Payload)) - println() -} - -//ProcessTransactionID - -func ProcessTransactionID(txid string) { - _, payload := QueryChain((txid)) - err := HandleChainTX("", payload) - if err != nil { - panic(err) - } -} - -func unique(stringSlice []string) []string { - keys := make(map[string]bool) - list := []string{} - for _, entry := range stringSlice { - if _, value := keys[entry]; !value { - keys[entry] = true - list = append(list, entry) - } - } - return list -} diff --git a/pkg/tendermint/tendermint_test.go b/pkg/tendermint/tendermint_test.go deleted file mode 100644 index 495f74f..0000000 --- a/pkg/tendermint/tendermint_test.go +++ /dev/null @@ -1,60 +0,0 @@ -package tendermint - -import "testing" - -var ( - nodeID = "QmT4y4MtV5mvPHkFjfUQYQ7h1WvAagMy2GTJCn2bF8DQb7" -) - -func Test_Order1(t *testing.T) { - a := "eyJQcm9jZXNzb3IiOiJ2MS9mdWxmaWxsL29yZGVyIiwiU2VuZGVySUQiOiJRbVQ0eTRNdFY1bXZQSGtGamZVUVlRN2gxV3ZBYWdNeTJHVEpDbjJiRjhEUWI3IiwiUmVjaXBpZW50SUQiOiJRbVQ0eTRNdFY1bXZQSGtGamZVUVlRN2gxV3ZBYWdNeTJHVEpDbjJiRjhEUWI3IiwiUGF5bG9hZCI6ImV5SnZjbVJsY2xCaGNuUXhRMGxFSWpvaVVXMVpVRU5xVEVGME1tbzVVbWhxU0U1TVkwRnVObEF5WTJseVJHWjZTRlpFWTBwMFkzbGtUVFZ5VWxoM1V5SXNJbVJ2WTNWdFpXNTBRMGxFSWpvaVVXMVVOSGswVFhSV05XMTJVRWhyUm1wbVZWRlpVVGRvTVZkMlFXRm5UWGt5UjFSS1EyNHlZa1k0UkZGaU55SjkifQ==" - err := HandleChainTX(nodeID, a) - if err != nil { - panic(err) - } -} - -func Test_FullFill(t *testing.T) { - a := "eyJQcm9jZXNzb3IiOiJPUkRFUl9SRVNQT05TRSIsIlNlbmRlcklEIjoiUW1UNHk0TXRWNW12UEhrRmpmVVFZUTdoMVd2QWFnTXkyR1RKQ24yYkY4RFFiNyIsIlJlY2lwaWVudElEIjoiUW1UNHk0TXRWNW12UEhrRmpmVVFZUTdoMVd2QWFnTXkyR1RKQ24yYkY4RFFiNyIsIlBheWxvYWQiOiJleUp2Y21SbGNsQmhjblF5UTBsRUlqb2lVVzFVZUZka1ltZEdhRGxHYWpGMlJIbFhlazVCWkROVmFuRjNlVEYyTkRsRlFtVjJhRzUyTVVWdk5HVllSaUo5In0=" - err := HandleChainTX(nodeID, a) - if err != nil { - panic(err) - } - -} - -func Test_DumpTXID(t *testing.T) { - a := "5fe5823c0d8b6d49f2ac99c90575566962ac3a14a6b2f1e7fe7ea1099b7b3bbd" - value, raw := QueryChain(a) - println(value) - bc, _ := decodeChainTX(raw) - print(string(bc.Payload)) -} - -//Use this to generate Order1 -//curl -s -X POST "http://localhost:5556/v1/order1" -H "accept: */*" -H "Content-Type: application/json" -d "{\"beneficiaryIDDocumentCID\":\"\",\"extension\":{\"coin\":\"0\"}}" - -func Test_All(t *testing.T) { - DumpTXID("dea1396bce7890f85da7dc86b4ece5c4d372886ed08948eca6a0beee36c412e0") - -} - -func Test_1(t *testing.T) { - txid := "473407b069ff917b110f38c36d5b9e5246b5ace5d82df38c5a188d5ac868cfec" - DumpTXID(txid) - ProcessTransactionID(txid) -} - -func Test_2(t *testing.T) { - txid := "586bc14b15a31999571c8188241beef046d3b78a9481ecee984e7c76a1d95112" - DumpTXID(txid) - ProcessTransactionID(txid) -} - -func Test_3(t *testing.T) { - txid := "5a48129fd272f2a8c57fdd96716a78c3be55a3cf811b179e82e54221d95ccbc4" - DumpTXID(txid) - ProcessTransactionID(txid) -} - -//curl -s -X POST "http://localhost:5556/v1/order1" -H "accept: */*" -H "Content-Type: application/json" -d "{\"beneficiaryIDDocumentCID\":\"\",\"extension\":{\"coin\":\"0\"}}" diff --git a/pkg/tendermint/websockets.go b/pkg/tendermint/websockets.go deleted file mode 100644 index 472ea95..0000000 --- a/pkg/tendermint/websockets.go +++ /dev/null @@ -1,204 +0,0 @@ -package tendermint - -import ( - "context" - "encoding/json" - "fmt" - "net/http" - "os" - "os/signal" - "strconv" - "syscall" - - "github.com/apache/incubator-milagro-dta/libs/datastore" - "github.com/apache/incubator-milagro-dta/libs/logger" - "github.com/apache/incubator-milagro-dta/pkg/api" - "github.com/apache/incubator-milagro-dta/pkg/service" - status "github.com/apache/incubator-milagro-dta/pkg/tendermint/status" - "github.com/pkg/errors" - tmclient "github.com/tendermint/tendermint/rpc/client" - ctypes "github.com/tendermint/tendermint/rpc/core/types" - tmtypes "github.com/tendermint/tendermint/types" -) - -func catchUp(quene chan tmtypes.Tx, store *datastore.Store, logger *logger.Logger, nodeID string, listenPort string, height int) error { - print("catch up") - return nil -} - -//Subscribe to Websocket and add to queue -func subscribeAndQueue(queueWaiting chan api.BlockChainTX, logger *logger.Logger, nodeID string, listenPort string, blockchainNode string) error { - client := tmclient.NewHTTP("tcp://"+blockchainNode+"", "/websocket") - //client.SetLogger(tmlogger) - err := client.Start() - if err != nil { - logger.Info("Failed to start Tendermint HTTP client %s", err) - return err - } - defer client.Stop() - - //curl "34.246.173.153:26657/tx_search?query=\"tag.part=4%20AND%20tag.reference='579a2864-e100-11e9-aaf4-acde48001122'\"" - query := "tag.recipient='" + nodeID + "'" - //query := "tm.event = 'Tx'" - - out, err := client.Subscribe(context.Background(), "test", query, 1000) - if err != nil { - logger.Info("Failed to subscribe to query %s %s", query, err) - return err - } - - logger.Info("Tendermint: Connected") - - quit := make(chan os.Signal, 1) - signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) - for { - select { - case result := <-out: - tx := result.Data.(tmtypes.EventDataTx).Tx - payload := api.BlockChainTX{} - err := json.Unmarshal(tx, &payload) - if err != nil { - logger.Info("******** Invalid TX - ignored") - break - } - - //check if this node is in receipient list - if payload.RecipientID != nodeID { - logger.Info("******** Invalid Recipient - why are we receiving this TX?") - break - - } - - //Add into the waitingQueue for later processing - queueWaiting <- payload - //fmt.Printf("Incoming Transaction:%d \n", len(queueWaiting)) - - case <-quit: - os.Exit(0) - } - } - return nil -} - -func TXbyHash(TXHash string) (api.BlockChainTX, error) { - client := tmclient.NewHTTP("tcp://"+node+"", "/websocket") - query := fmt.Sprintf("tag.txhash='%s'", TXHash) - result, err := client.TxSearch(query, true, 1, 1) - - if len(result.Txs) == 0 { - return api.BlockChainTX{}, errors.New("Not found") - } - - payload := api.BlockChainTX{} - err = json.Unmarshal(result.Txs[0].Tx, &payload) - - _ = payload - - if err != nil { - return payload, err - } - // - // res := result.Txs[0] - // tx := res.Tx - return payload, nil - -} - -//loadAllHistoricTX - load the history for this node into a queue -func loadAllHistoricTX(start int, end int, txHistory []ctypes.ResultTx, nodeID string, listenPort string) error { - //cycle through the historic transactions page by page - //Get all transactions that claim to be from me - check signatures - //Get all transactions that claim to be to me - - - client := tmclient.NewHTTP("tcp://"+node+"", "/websocket") - currentPage := 1 - query := fmt.Sprintf("tag.recipient='%v' AND tag.sender='%v' AND tx.height>%d AND tx.height<=%d", nodeID, nodeID, start, end) - numPerPage := 5 - - for { - result, err := client.TxSearch(query, true, currentPage, numPerPage) - if err != nil { - return errors.New("Failed to query chain for transaction history") - } - - for _, tx := range result.Txs { - txHistory = append(txHistory, *tx) - } - if currentPage*numPerPage > result.TotalCount { - break - } - currentPage++ - } - parseHistory(txHistory) - return nil -} - -func parseHistory(txHistory []ctypes.ResultTx) { - txCount := len(txHistory) - - //loop backwards - for i := txCount - 1; i >= 0; i-- { - resTx := txHistory[i] - tx := resTx.Tx - - //Decode TX into BlockchainTX Object - payload := api.BlockChainTX{} - err := json.Unmarshal(tx, &payload) - if err != nil { - msg := fmt.Sprintf("Invalid Transaction Hash:%v Height:%v Index:% \n", resTx.Hash, resTx.Height, resTx.Index) - print(msg) - continue - } - //Decode BlockchainTX.payload into Protobuffer Qredo - // TODO: - // Parse the incoming TX, check sig - // If from self, can assume correct - // builds transaction chains using previous transactionHash - // Ensure every - // Check recipient/sender in tags are correct - // - _ = payload - } - print("Finished loading - but not parsing the History\n") -} - -func processTXQueue(svc service.Service, queue chan api.BlockChainTX, listenPort string) { - print("Processing queue\n") - for payload := range queue { - callNextTX(svc, &payload, listenPort) - } -} - -//Subscribe - Connect to the Tendermint websocket to collect events -func Subscribe(svc service.Service, store *datastore.Store, logger *logger.Logger, nodeID string, listenPort string) error { - - latestStatus, _ := getChainStatus(node) - currentBlockHeight, err := strconv.Atoi(latestStatus.Result.SyncInfo.LatestBlockHeight) - - if err != nil { - return errors.New("Failed to obtain latest blockheight of Blockchain") - } - - var processedToHeight int - store.Get("chain", "height", &processedToHeight) - - //first catch up to Tip of chain - var txHistory []ctypes.ResultTx - queueWaiting := make(chan api.BlockChainTX, 1000) - - //while we are processessing the history save all new transactions in a queue for later - go subscribeAndQueue(queueWaiting, logger, nodeID, listenPort, node) - loadAllHistoricTX(processedToHeight, currentBlockHeight, txHistory, nodeID, listenPort) - processTXQueue(svc, queueWaiting, listenPort) - return nil -} - -func getChainStatus(node string) (status.StatusResponse, error) { - resp, err := http.Get("http://" + node + "/status") - result := status.StatusResponse{} - if err != nil { - return result, err - } - json.NewDecoder(resp.Body).Decode((&result)) - return result, nil -}
