This is an automated email from the ASF dual-hosted git repository.

smihaylov pushed a commit to branch tendermint
in repository https://gitbox.apache.org/repos/asf/incubator-milagro-dta.git

commit 4f81f8ccf9e65e10b1d31570f5748d29032039fa
Author: Stanislav Mihaylov <[email protected]>
AuthorDate: Mon Oct 7 15:38:09 2019 +0300

    Refactor Tendermint package
---
 cmd/service/commands.go           |   1 +
 cmd/service/main.go               |  63 ++++++++--
 cmd/servicetester/tendertest1.sh  |   4 +-
 go.mod                            |   2 -
 go.sum                            |   5 +-
 libs/documents/docs.go            |   2 +-
 pkg/api/proto.go                  |  10 ++
 pkg/common/chain.go               |  14 +--
 pkg/common/common.go              |   5 +-
 pkg/defaultservice/fulfillTX.go   |  34 ++++--
 pkg/defaultservice/init.go        |  14 ++-
 pkg/defaultservice/order.go       |  35 ++++--
 pkg/defaultservice/orderTX.go     |  45 +++++--
 pkg/defaultservice/service.go     |  13 +-
 pkg/tendermint/cmd/cmd            | Bin 10230244 -> 0 bytes
 pkg/tendermint/config.go          |   6 -
 pkg/tendermint/connector.go       | 245 ++++++++++++++++++++++++++++++++++++++
 pkg/tendermint/tendermint.go      | 165 -------------------------
 pkg/tendermint/tendermint_test.go |  60 ----------
 pkg/tendermint/websockets.go      | 204 -------------------------------
 20 files changed, 424 insertions(+), 503 deletions(-)

diff --git a/cmd/service/commands.go b/cmd/service/commands.go
index d3d16bd..20a9300 100644
--- a/cmd/service/commands.go
+++ b/cmd/service/commands.go
@@ -78,6 +78,7 @@ func parseConfig(args []string) (*config.Config, error) {
 
        fs := flag.NewFlagSet("daemon", flag.ExitOnError)
        fs.StringVar(&(cfg.Plugins.Service), "service", cfg.Plugins.Service, 
"Service plugin")
+       fs.StringVar(&(cfg.Log.Level), "log-level", cfg.Log.Level, "Log level")
 
        if err := fs.Parse(args); err != nil {
                return nil, err
diff --git a/cmd/service/main.go b/cmd/service/main.go
index 41bb4a3..9c77efe 100644
--- a/cmd/service/main.go
+++ b/cmd/service/main.go
@@ -21,6 +21,7 @@ Package main - handles config, initialisation and starts the 
service daemon
 package main
 
 import (
+       "context"
        "crypto/rand"
        "fmt"
        "net/http"
@@ -36,6 +37,7 @@ import (
        "github.com/apache/incubator-milagro-dta/libs/ipfs"
        "github.com/apache/incubator-milagro-dta/libs/logger"
        "github.com/apache/incubator-milagro-dta/libs/transport"
+       "github.com/apache/incubator-milagro-dta/pkg/api"
        "github.com/apache/incubator-milagro-dta/pkg/config"
        "github.com/apache/incubator-milagro-dta/pkg/defaultservice"
        "github.com/apache/incubator-milagro-dta/pkg/endpoints"
@@ -138,11 +140,12 @@ func startDaemon(args []string) error {
                cfg.Log.Format,
                cfg.Log.Level,
        )
-
        if err != nil {
                return errors.Wrap(err, "init logger")
        }
 
+       logger.Debug("Logger in DEBUG mode!")
+
        // Create KV store
        logger.Info("Datastore type: %s", cfg.Node.Datastore)
        store, err := initDataStore(cfg.Node.Datastore)
@@ -187,6 +190,12 @@ func startDaemon(args []string) error {
                }
        }
 
+       // Init Tendermint node connector
+       tmConnector, err := 
tendermint.NewNodeConnector(cfg.Blockchain.BroadcastNode, cfg.Node.NodeID, 
store, logger)
+       if err != nil {
+               return errors.Wrap(err, "Blockchain Node connector")
+       }
+
        //The Server must have a valid ID before starting up
        svcPlugin := plugins.FindServicePlugin(cfg.Plugins.Service)
        if svcPlugin == nil {
@@ -200,7 +209,7 @@ func startDaemon(args []string) error {
                defaultservice.WithDataStore(store),
                defaultservice.WithKeyStore(keyStore),
                defaultservice.WithIPFS(ipfsConnector),
-               defaultservice.WithMasterFiduciary(masterFiduciaryServer),
+               defaultservice.WithTendermint(tmConnector),
                defaultservice.WithConfig(cfg),
        ); err != nil {
                return errors.Errorf("init service plugin %s", 
cfg.Plugins.Service)
@@ -223,23 +232,47 @@ func startDaemon(args []string) error {
        }, []string{"method", "success"})
 
        // Stop chan
+       ctx, cancelContext := context.WithCancel(context.Background())
+
        errChan := make(chan error)
 
-       logger.Info("NODE ID (IPFS):  %v", svcPlugin.NodeID())
        logger.Info("Node Type: %v", strings.ToLower(cfg.Node.NodeType))
-       endpoints := endpoints.Endpoints(svcPlugin, cfg.HTTP.CorsAllow, 
authorizer, logger, cfg.Node.NodeType, svcPlugin)
-       httpHandler := transport.NewHTTPHandler(endpoints, logger, duration)
+       logger.Info("Node ID:  %v", svcPlugin.NodeID())
+       logger.Info("Master Fiduciary: %v", svcPlugin.MasterFiduciaryNodeID())
 
        //Connect to Blockchain - Tendermint
-       go tendermint.Subscribe(svcPlugin, store, logger, cfg.Node.NodeID, 
cfg.HTTP.ListenAddr)
-       if err != nil {
-               return errors.Wrap(err, "init Tendermint Blockchain")
-       }
+       go func() {
+               processFn := func(tx *api.BlockChainTX) error {
+                       switch tx.Processor {
+                       case "none":
+                               return nil
+                       case "dump":
+                               svcPlugin.Dump(tx)
+                       case "v1/fulfill/order":
+                               svcPlugin.FulfillOrder(tx)
+                       case "v1/order2":
+                               svcPlugin.Order2(tx)
+                       case "v1/fulfill/order/secret":
+                               svcPlugin.FulfillOrderSecret(tx)
+                       case "v1/order/secret2":
+                               svcPlugin.OrderSecret2(tx)
+                       default:
+                               return errors.New("Unknown processor")
+                       }
+                       return nil
+               }
+
+               logger.Info("Starting Blockchain listener to node: %v", 
cfg.Blockchain.BroadcastNode)
+               errChan <- tmConnector.Subscribe(ctx, processFn)
+               // errChan <- tendermint.Subscribe(svcPlugin, store, logger, 
cfg.Node.NodeID, cfg.HTTP.ListenAddr)
+       }()
 
        // Start the application http server
        go func() {
-               logger.Info("starting listener on %v, custody server %v", 
cfg.HTTP.ListenAddr, cfg.Node.MasterFiduciaryServer)
-               // 
httpHandler.PathPrefix("/api/").Handler(http.St:ripPrefix("/api/", 
http.FileServer(http.Dir("./swagger"))))
+               httpEndpoints := endpoints.Endpoints(svcPlugin, 
cfg.HTTP.CorsAllow, authorizer, logger, cfg.Node.NodeType, svcPlugin)
+               httpHandler := transport.NewHTTPHandler(httpEndpoints, logger, 
duration)
+
+               logger.Info("Starting HTTP listener on %v", cfg.HTTP.ListenAddr)
                errChan <- http.ListenAndServe(cfg.HTTP.ListenAddr, httpHandler)
        }()
 
@@ -247,7 +280,7 @@ func startDaemon(args []string) error {
                http.DefaultServeMux.Handle("/metrics", promhttp.Handler())
                // Start the debug and metrics http server
                go func() {
-                       logger.Info("starting metrics listener on %v", 
cfg.HTTP.MetricsAddr)
+                       logger.Info("Starting metrics listener on %v", 
cfg.HTTP.MetricsAddr)
                        errChan <- http.ListenAndServe(cfg.HTTP.MetricsAddr, 
http.DefaultServeMux)
                }()
        }
@@ -260,7 +293,11 @@ func startDaemon(args []string) error {
        }()
 
        stopErr := <-errChan
-       _ = logger.Log("exit", stopErr.Error())
+       if stopErr != nil {
+               _ = logger.Log("exit", stopErr.Error())
+       }
+
+       cancelContext()
        return store.Close()
 }
 
diff --git a/cmd/servicetester/tendertest1.sh b/cmd/servicetester/tendertest1.sh
index 64fc139..7f91702 100755
--- a/cmd/servicetester/tendertest1.sh
+++ b/cmd/servicetester/tendertest1.sh
@@ -1,8 +1,8 @@
-ref=$(curl -s -X POST "127.0.0.1:5556/v1/order1" -H "accept: */*" -H 
"Content-Type: application/json" -d 
"{\"beneficiaryIDDocumentCID\":\"\",\"extension\":{\"coin\":\"0\"}}")
+ref=$(curl -s -X POST "127.0.0.1:5556/v1/order1" -H "accept: */*" -H 
"Content-Type: application/json" -d 
"{\"beneficiaryIDDocumentCID\":\"QmecffRZTSJDETCRLcjcPe7ynmYNyYpnh7WKzKTdmX1GBZ\",\"extension\":{\"coin\":\"0\"}}")
 
 #sleep long enough for blockchain to catch up
 sleep 4
 
-curl -X POST "127.0.0.1:5556/v1/order/secret1" -H "accept: */*" -H 
"Content-Type: application/json" -d 
"{\"orderReference\":$ref,\"beneficiaryIDDocumentCID\":\"QmcyJqEMqNEEYHrNSyUY83CQCNwZ5yVan3SgaQ4NchsqsC\"}"
+curl -X POST "127.0.0.1:5556/v1/order/secret1" -H "accept: */*" -H 
"Content-Type: application/json" -d 
"{\"orderReference\":$ref,\"beneficiaryIDDocumentCID\":\"QmecffRZTSJDETCRLcjcPe7ynmYNyYpnh7WKzKTdmX1GBZ\"}"
 
 
diff --git a/go.mod b/go.mod
index 7e0150c..860eb9c 100644
--- a/go.mod
+++ b/go.mod
@@ -2,8 +2,6 @@ module github.com/apache/incubator-milagro-dta
 
 require (
        github.com/TylerBrock/colorjson v0.0.0-20180527164720-95ec53f28296
-       github.com/btcsuite/btcd v0.0.0-20190427004231-96897255fd17
-       github.com/VividCortex/gohistogram v1.0.0 // indirect
        github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3
        github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d
        github.com/coreos/go-oidc v2.0.0+incompatible
diff --git a/go.sum b/go.sum
index 1ee5995..fd46e0f 100644
--- a/go.sum
+++ b/go.sum
@@ -16,8 +16,8 @@ github.com/TylerBrock/colorjson 
v0.0.0-20180527164720-95ec53f28296 h1:JYWTroLXcN
 github.com/TylerBrock/colorjson v0.0.0-20180527164720-95ec53f28296/go.mod 
h1:VSw57q4QFiWDbRnjdX8Cb3Ow0SFncRw+bA/ofY6Q83w=
 github.com/VividCortex/gohistogram v1.0.0 
h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE=
 github.com/VividCortex/gohistogram v1.0.0/go.mod 
h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
-github.com/Workiva/go-datastructures v1.0.50/go.mod 
h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA=
 github.com/VividCortex/gohistogram v1.0.0/go.mod 
h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
+github.com/Workiva/go-datastructures v1.0.50/go.mod 
h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA=
 github.com/aead/siphash v1.0.1 h1:FwHfE/T45KPKYuuSAKyyvE+oPWcaQ+CUmFW0bPlM+kg=
 github.com/aead/siphash v1.0.1/go.mod 
h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
 github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod 
h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
@@ -105,9 +105,9 @@ github.com/fortytw2/leaktest v1.3.0 
h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8
 github.com/fortytw2/leaktest v1.3.0/go.mod 
h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
 github.com/fsnotify/fsnotify v1.4.7 
h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
 github.com/fsnotify/fsnotify v1.4.7/go.mod 
h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
+github.com/fsnotify/fsnotify v1.4.7/go.mod 
h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
 github.com/ghodss/yaml v1.0.0/go.mod 
h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
 github.com/go-check/check v0.0.0-20180628173108-788fd7840127 
h1:0gkP6mzaMqkmpcJYCFOLkIBwI7xFExG03bbkOkCvUPI=
-github.com/fsnotify/fsnotify v1.4.7/go.mod 
h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
 github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod 
h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98=
 github.com/go-critic/go-critic v0.3.5-0.20190526074819-1df300866540/go.mod 
h1:+sE8vrLDS2M0pZkBk0wy6+nLdKexVDrl/jBqQOTDThA=
 github.com/go-kit/kit v0.8.0 h1:Wz+5lgoB0kkuqLEc6NVmwRknTKP6dTGbSqvhZtBI/j0=
@@ -689,6 +689,7 @@ github.com/tendermint/go-amino v0.14.1 
h1:o2WudxNfdLNBwMyl2dqOJxiro5rfrEaU0Ugs6o
 github.com/tendermint/go-amino v0.14.1/go.mod 
h1:i/UKE5Uocn+argJJBb12qTZsCDBcAYMbR92AaJVmKso=
 github.com/tendermint/tendermint v0.32.4 
h1:KwZIMtT+ROvfMYO3wine6F9hak3SpngcRcAIzys1J3I=
 github.com/tendermint/tendermint v0.32.4/go.mod 
h1:D2+A3pNjY+Po72X0mTfaXorFhiVI8dh/Zg640FGyGtE=
+github.com/tendermint/tendermint v0.32.5 
h1:2hCLwuzfCKZxXSe/+iMEl+ChJWKJx6g/Wcvq3NMxVN4=
 github.com/tendermint/tm-db v0.2.0 
h1:rJxgdqn6fIiVJZy4zLpY1qVlyD0TU6vhkT4kEf71TQQ=
 github.com/tendermint/tm-db v0.2.0/go.mod 
h1:0cPKWu2Mou3IlxecH+MEUSYc1Ch537alLe6CpFrKzgw=
 github.com/texttheater/golang-levenshtein 
v0.0.0-20180516184445-d188e65d659e/go.mod 
h1:XDKHRm5ThF8YJjx001LtgelzsoaEcvnA7lVWz9EeX3g=
diff --git a/libs/documents/docs.go b/libs/documents/docs.go
index 7e9fe27..59bad92 100644
--- a/libs/documents/docs.go
+++ b/libs/documents/docs.go
@@ -93,7 +93,7 @@ func DecodeIDDocument(rawdoc []byte, tag string, idDocument 
*IDDoc) error {
        return nil
 }
 
-//PeekOrderDocument - look at the header inside an order document before 
decryption
+//OrderDocumentSigner - look at the header inside an order document before 
decryption
 func OrderDocumentSigner(rawDoc []byte) (string, error) {
        signedEnvelope := SignedEnvelope{}
        err := proto.Unmarshal(rawDoc, &signedEnvelope)
diff --git a/pkg/api/proto.go b/pkg/api/proto.go
index aedcd2c..e3200d7 100644
--- a/pkg/api/proto.go
+++ b/pkg/api/proto.go
@@ -24,6 +24,8 @@ package api
 */
 
 import (
+       "crypto/sha256"
+       "encoding/hex"
        "time"
 )
 
@@ -51,6 +53,14 @@ type BlockChainTX struct {
        Tags                   map[string]string
 }
 
+// CalcHash calculates, sets the TXhash and returns the string representation
+func (tx *BlockChainTX) CalcHash() string {
+       txSha := sha256.Sum256(tx.Payload)
+       tx.TXhash = txSha[:]
+       return hex.EncodeToString(txSha[:])
+
+}
+
 //CreateIdentityRequest -
 type CreateIdentityRequest struct {
        Name      string            `json:"name,omitempty" 
validate:"required,alphanum"`
diff --git a/pkg/common/chain.go b/pkg/common/chain.go
index d7a7023..a0438b5 100644
--- a/pkg/common/chain.go
+++ b/pkg/common/chain.go
@@ -10,16 +10,8 @@ import (
 )
 
 // CreateTX creates the transaction ready for write to the chain
-func CreateTX(nodeID string, store *datastore.Store, id string, order 
*documents.OrderDoc, recipients map[string]documents.IDDoc) ([]byte, []byte, 
error) {
-       secrets := &IdentitySecrets{}
-       if err := store.Get("id-doc", nodeID, secrets); err != nil {
-               return nil, nil, errors.New("load secrets from store")
-       }
-       blsSecretKey, err := hex.DecodeString(secrets.BLSSecretKey)
-       if err != nil {
-               return nil, nil, errors.Wrap(err, "Decode identity secrets")
-       }
-       rawDoc, err := documents.EncodeOrderDocument(nodeID, *order, 
blsSecretKey, "previousID", recipients)
+func CreateTX(nodeID string, store *datastore.Store, blsSecretKey []byte, id 
string, order *documents.OrderDoc, recipients map[string]*documents.IDDoc) 
([]byte, []byte, error) {
+       rawDoc, err := documents.EncodeOrderDocument(nodeID, *order, 
blsSecretKey, recipients)
        if err != nil {
                return nil, nil, errors.Wrap(err, "Failed to encode IDDocument")
        }
@@ -32,7 +24,7 @@ func CreateTX(nodeID string, store *datastore.Store, id 
string, order *documents
        return TXID[:], rawDoc, nil
 }
 
-//Decode a transaction for header data but don't decrypt it
+//PeekTX Decode a transaction for header data but don't decrypt it
 func PeekTX(tx []byte) (string, error) {
        signerCID, err := documents.OrderDocumentSigner(tx)
        print(signerCID)
diff --git a/pkg/common/common.go b/pkg/common/common.go
index 4e8220b..cdca4fe 100644
--- a/pkg/common/common.go
+++ b/pkg/common/common.go
@@ -105,6 +105,7 @@ func RetrieveSeed(store *datastore.Store, reference string) 
(seedHex string, err
        return seedHex, nil
 }
 
+//WriteOrderToStore stores an order
 func WriteOrderToStore(store *datastore.Store, orderReference string, address 
string) error {
        if err := store.Set("order", orderReference, address, 
map[string]string{"time": time.Now().UTC().Format(time.RFC3339)}); err != nil {
                return errors.New("Save Order to store")
@@ -113,8 +114,8 @@ func WriteOrderToStore(store *datastore.Store, 
orderReference string, address st
 }
 
 // BuildRecipientList builds a list of recipients who are able to decrypt the 
encrypted envelope
-func BuildRecipientList(ipfs ipfs.Connector, IDDocs ...string) 
(map[string]documents.IDDoc, error) {
-       recipients := make(map[string]documents.IDDoc)
+func BuildRecipientList(ipfs ipfs.Connector, IDDocs ...string) 
(map[string]*documents.IDDoc, error) {
+       recipients := make(map[string]*documents.IDDoc)
        for _, v := range IDDocs {
                iddoc, err := RetrieveIDDocFromIPFS(ipfs, v)
                if err != nil {
diff --git a/pkg/defaultservice/fulfillTX.go b/pkg/defaultservice/fulfillTX.go
index b0c1025..b0527a9 100644
--- a/pkg/defaultservice/fulfillTX.go
+++ b/pkg/defaultservice/fulfillTX.go
@@ -25,9 +25,10 @@ import (
        "github.com/apache/incubator-milagro-dta/libs/documents"
        "github.com/apache/incubator-milagro-dta/pkg/api"
        "github.com/apache/incubator-milagro-dta/pkg/common"
-       "github.com/apache/incubator-milagro-dta/pkg/tendermint"
+       "github.com/apache/incubator-milagro-dta/pkg/identity"
 )
 
+// FulfillOrder TX
 func (s *Service) FulfillOrder(tx *api.BlockChainTX) (string, error) {
        nodeID := s.NodeID()
        reqPayload := tx.Payload
@@ -40,7 +41,16 @@ func (s *Service) FulfillOrder(tx *api.BlockChainTX) 
(string, error) {
        }
        remoteIDDocCID := signerID
 
-       _, _, _, sikeSK, err := common.RetrieveIdentitySecrets(s.Store, nodeID)
+       // SIKE and BLS keys
+       keyseed, err := s.KeyStore.Get("seed")
+       if err != nil {
+               return "", err
+       }
+       _, sikeSK, err := identity.GenerateSIKEKeys(keyseed)
+       if err != nil {
+               return "", err
+       }
+       _, blsSK, err := identity.GenerateBLSKeys(keyseed)
        if err != nil {
                return "", err
        }
@@ -83,7 +93,7 @@ func (s *Service) FulfillOrder(tx *api.BlockChainTX) (string, 
error) {
        }
 
        //Create a new Transaction payload and TX
-       txHash, payload, err := common.CreateTX(nodeID, s.Store, nodeID, order, 
recipientList)
+       txHash, payload, err := common.CreateTX(nodeID, s.Store, blsSK, nodeID, 
order, recipientList)
 
        //Write the requests to the chain
        chainTX := &api.BlockChainTX{
@@ -95,8 +105,8 @@ func (s *Service) FulfillOrder(tx *api.BlockChainTX) 
(string, error) {
                TXhash:                 txHash,
                Tags:                   map[string]string{"reference": 
order.Reference, "txhash": hex.EncodeToString(txHash)},
        }
-       return tendermint.PostToChain(chainTX, "FulfillOrder")
 
+       return s.Tendermint.PostTx(chainTX, "FulfillOrder")
 }
 
 // FulfillOrderSecret -
@@ -112,7 +122,16 @@ func (s *Service) FulfillOrderSecret(tx *api.BlockChainTX) 
(string, error) {
        }
        remoteIDDocCID := signerID
 
-       _, _, _, sikeSK, err := common.RetrieveIdentitySecrets(s.Store, nodeID)
+       // SIKE and BLS keys
+       keyseed, err := s.KeyStore.Get("seed")
+       if err != nil {
+               return "", err
+       }
+       _, sikeSK, err := identity.GenerateSIKEKeys(keyseed)
+       if err != nil {
+               return "", err
+       }
+       _, blsSK, err := identity.GenerateBLSKeys(keyseed)
        if err != nil {
                return "", err
        }
@@ -155,7 +174,7 @@ func (s *Service) FulfillOrderSecret(tx *api.BlockChainTX) 
(string, error) {
        }
 
        //Create a new Transaction payload and TX
-       txHash, payload, err := common.CreateTX(nodeID, s.Store, nodeID, order, 
recipientList)
+       txHash, payload, err := common.CreateTX(nodeID, s.Store, blsSK, nodeID, 
order, recipientList)
 
        //Write the requests to the chain
        chainTX := &api.BlockChainTX{
@@ -167,5 +186,6 @@ func (s *Service) FulfillOrderSecret(tx *api.BlockChainTX) 
(string, error) {
                Payload: payload,
                Tags:    map[string]string{"reference": order.Reference, 
"txhash": hex.EncodeToString(txHash)},
        }
-       return tendermint.PostToChain(chainTX, "FulfillOrderSecret")
+
+       return s.Tendermint.PostTx(chainTX, "FulfillOrderSecret")
 }
diff --git a/pkg/defaultservice/init.go b/pkg/defaultservice/init.go
index f0c0190..e4874ae 100644
--- a/pkg/defaultservice/init.go
+++ b/pkg/defaultservice/init.go
@@ -24,8 +24,8 @@ import (
        "github.com/apache/incubator-milagro-dta/libs/ipfs"
        "github.com/apache/incubator-milagro-dta/libs/keystore"
        "github.com/apache/incubator-milagro-dta/libs/logger"
-       "github.com/apache/incubator-milagro-dta/pkg/api"
        "github.com/apache/incubator-milagro-dta/pkg/config"
+       "github.com/apache/incubator-milagro-dta/pkg/tendermint"
 )
 
 // ServiceOption function to set Service properties
@@ -84,10 +84,18 @@ func WithIPFS(ipfsConnector ipfs.Connector) ServiceOption {
        }
 }
 
+// WithTendermint adds tendermint node connector to the Service
+func WithTendermint(tmConnector *tendermint.NodeConnector) ServiceOption {
+       return func(s *Service) error {
+               s.Tendermint = tmConnector
+               return nil
+       }
+}
+
 // WithMasterFiduciary adds master fiduciary connector to the Service
-func WithMasterFiduciary(masterFiduciaryServer api.ClientService) 
ServiceOption {
+func WithMasterFiduciary(masterFiduciaryNodeID string) ServiceOption {
        return func(s *Service) error {
-               s.MasterFiduciaryServer = masterFiduciaryServer
+               s.SetMasterFiduciaryNodeID(masterFiduciaryNodeID)
                return nil
        }
 }
diff --git a/pkg/defaultservice/order.go b/pkg/defaultservice/order.go
index acc6a6d..3f60158 100644
--- a/pkg/defaultservice/order.go
+++ b/pkg/defaultservice/order.go
@@ -27,7 +27,6 @@ import (
        "github.com/apache/incubator-milagro-dta/pkg/api"
        "github.com/apache/incubator-milagro-dta/pkg/common"
        "github.com/apache/incubator-milagro-dta/pkg/identity"
-       "github.com/apache/incubator-milagro-dta/pkg/tendermint"
        "github.com/pkg/errors"
 )
 
@@ -111,8 +110,6 @@ func (s *Service) PrepareOrderPart1(order 
*documents.OrderDoc, reqExtension map[
 }
 
 // PrepareOrderResponse gets the updated order and returns the commitment and 
extension
-//func (s *Service) PrepareOrderResponse(orderPart2 *documents.OrderDoc, 
reqExtension, fulfillExtension map[string]string) (commitment string, extension 
map[string]string, err error) {
-
 func (s *Service) PrepareOrderResponse(orderPart2 *documents.OrderDoc) 
(commitment string, extension map[string]string, err error) {
        return orderPart2.OrderPart2.CommitmentPublicKey, nil, nil
 }
@@ -158,8 +155,18 @@ func (s *Service) Order1(req *api.OrderRequest) (string, 
error) {
                order.OrderReqExtension[key] = value
        }
 
+       // BLS key
+       keyseed, err := s.KeyStore.Get("seed")
+       if err != nil {
+               return "", err
+       }
+       _, blsSK, err := identity.GenerateBLSKeys(keyseed)
+       if err != nil {
+               return "", err
+       }
+
        //This is serialized and output to the chain
-       txHash, payload, err := common.CreateTX(nodeID, s.Store, nodeID, order, 
recipientList)
+       txHash, payload, err := common.CreateTX(nodeID, s.Store, blsSK, nodeID, 
order, recipientList)
 
        //Write the requests to the chain
        chainTX := &api.BlockChainTX{
@@ -171,7 +178,11 @@ func (s *Service) Order1(req *api.OrderRequest) (string, 
error) {
                TXhash:                 txHash,
                Tags:                   map[string]string{"reference": 
order.Reference, "txhash": hex.EncodeToString(txHash)},
        }
-       tendermint.PostToChain(chainTX, "Order1")
+
+       if _, err := s.Tendermint.PostTx(chainTX, "Order1"); err != nil {
+               return "", err
+       }
+
        return order.Reference, nil
 }
 
@@ -196,11 +207,11 @@ func (s *Service) OrderSecret1(req 
*api.OrderSecretRequest) (string, error) {
        // SIKE and BLS keys
        keyseed, err := s.KeyStore.Get("seed")
        if err != nil {
-               return nil, err
+               return "", err
        }
        _, sikeSK, err := identity.GenerateSIKEKeys(keyseed)
        if err != nil {
-               return nil, err
+               return "", err
        }
        _, blsSK, err := identity.GenerateBLSKeys(keyseed)
        if err != nil {
@@ -212,7 +223,7 @@ func (s *Service) OrderSecret1(req *api.OrderSecretRequest) 
(string, error) {
                return "", err
        }
 
-       tx, err := tendermint.TXbyHash(previousOrderHash)
+       tx, err := s.Tendermint.GetTx(previousOrderHash)
        if err != nil {
                return "", err
        }
@@ -258,7 +269,7 @@ func (s *Service) OrderSecret1(req *api.OrderSecretRequest) 
(string, error) {
                Timestamp:                time.Now().Unix(),
        }
 
-       txHash, payload, err := common.CreateTX(nodeID, s.Store, nodeID, order, 
recipientList)
+       txHash, payload, err := common.CreateTX(nodeID, s.Store, blsSK, nodeID, 
order, recipientList)
 
        //Write the requests to the chain
        chainTX := &api.BlockChainTX{
@@ -269,6 +280,10 @@ func (s *Service) OrderSecret1(req 
*api.OrderSecretRequest) (string, error) {
                Payload:                payload,
                Tags:                   map[string]string{"reference": 
order.Reference, "txhash": hex.EncodeToString(txHash)},
        }
-       tendermint.PostToChain(chainTX, "OrderSecret1")
+
+       if _, err := s.Tendermint.PostTx(chainTX, "OrderSecret1"); err != nil {
+               return "", err
+       }
+
        return order.Reference, nil
 }
diff --git a/pkg/defaultservice/orderTX.go b/pkg/defaultservice/orderTX.go
index 4fe62f4..d6188c9 100644
--- a/pkg/defaultservice/orderTX.go
+++ b/pkg/defaultservice/orderTX.go
@@ -19,11 +19,12 @@ package defaultservice
 
 import (
        "encoding/hex"
+       "fmt"
 
        "github.com/apache/incubator-milagro-dta/libs/documents"
        "github.com/apache/incubator-milagro-dta/pkg/api"
        "github.com/apache/incubator-milagro-dta/pkg/common"
-       "github.com/apache/incubator-milagro-dta/pkg/tendermint"
+       "github.com/apache/incubator-milagro-dta/pkg/identity"
        "github.com/pkg/errors"
 )
 
@@ -38,7 +39,16 @@ func (s *Service) Order2(tx *api.BlockChainTX) (string, 
error) {
                return "", err
        }
 
-       _, _, _, sikeSK, err := common.RetrieveIdentitySecrets(s.Store, nodeID)
+       // SIKE and BLS keys
+       keyseed, err := s.KeyStore.Get("seed")
+       if err != nil {
+               return "", err
+       }
+       _, sikeSK, err := identity.GenerateSIKEKeys(keyseed)
+       if err != nil {
+               return "", err
+       }
+       _, blsSK, err := identity.GenerateBLSKeys(keyseed)
        if err != nil {
                return "", err
        }
@@ -73,7 +83,7 @@ func (s *Service) Order2(tx *api.BlockChainTX) (string, 
error) {
        }
 
        //Generate a transaction
-       txHash, payload, err := common.CreateTX(nodeID, s.Store, nodeID, order, 
recipientList)
+       txHash, payload, err := common.CreateTX(nodeID, s.Store, blsSK, nodeID, 
order, recipientList)
 
        //Write the Order2 results to the chain
        chainTX := &api.BlockChainTX{
@@ -84,8 +94,8 @@ func (s *Service) Order2(tx *api.BlockChainTX) (string, 
error) {
                Payload:                payload,
                Tags:                   map[string]string{"reference": 
order.Reference, "txhash": hex.EncodeToString(txHash)},
        }
-       return tendermint.PostToChain(chainTX, "Order2")
 
+       return s.Tendermint.PostTx(chainTX, "Order2")
 }
 
 // OrderSecret2 - Process an incoming Blockchain Order/Secret transaction from 
a MasterFiduciary, to generate the final secret
@@ -94,7 +104,16 @@ func (s *Service) OrderSecret2(tx *api.BlockChainTX) 
(string, error) {
        reqPayload := tx.Payload
        txHashString := hex.EncodeToString(tx.TXhash)
 
-       _, _, _, sikeSK, err := common.RetrieveIdentitySecrets(s.Store, nodeID)
+       // SIKE and BLS keys
+       keyseed, err := s.KeyStore.Get("seed")
+       if err != nil {
+               return "", err
+       }
+       _, sikeSK, err := identity.GenerateSIKEKeys(keyseed)
+       if err != nil {
+               return "", err
+       }
+       _, blsSK, err := identity.GenerateBLSKeys(keyseed)
        if err != nil {
                return "", err
        }
@@ -107,17 +126,16 @@ func (s *Service) OrderSecret2(tx *api.BlockChainTX) 
(string, error) {
        //Decode the Order from the supplied TX
        order := &documents.OrderDoc{}
        err = documents.DecodeOrderDocument(reqPayload, txHashString, order, 
sikeSK, nodeID, remoteIDDoc.BLSPublicKey)
+       if err != nil {
+               fmt.Println("ERROR DEcode Order:", err)
+               return "", err
+       }
 
        if order.BeneficiaryCID != nodeID {
                return "", errors.New("Invalid Processor")
        }
 
-       _, seed, _, sikeSK, err := common.RetrieveIdentitySecrets(s.Store, 
nodeID)
-       if err != nil {
-               return "", err
-       }
-
-       finalPrivateKey, _, extension, err := s.Plugin.ProduceFinalSecret(seed, 
sikeSK, order, order, nodeID)
+       finalPrivateKey, _, extension, err := 
s.Plugin.ProduceFinalSecret(keyseed, sikeSK, order, order, nodeID)
        if err != nil {
                return "", err
        }
@@ -140,7 +158,7 @@ func (s *Service) OrderSecret2(tx *api.BlockChainTX) 
(string, error) {
        if err != nil {
                return "", err
        }
-       txHash, payload, err := common.CreateTX(nodeID, s.Store, nodeID, order, 
recipientList)
+       txHash, payload, err := common.CreateTX(nodeID, s.Store, blsSK, nodeID, 
order, recipientList)
 
        //Write the requests to the chain
        chainTX := &api.BlockChainTX{
@@ -151,5 +169,6 @@ func (s *Service) OrderSecret2(tx *api.BlockChainTX) 
(string, error) {
                Payload:                payload,
                Tags:                   map[string]string{"reference": 
order.Reference, "txhash": hex.EncodeToString(txHash)},
        }
-       return tendermint.PostToChain(chainTX, "OrderSecret2")
+
+       return s.Tendermint.PostTx(chainTX, "OrderSecret2")
 }
diff --git a/pkg/defaultservice/service.go b/pkg/defaultservice/service.go
index 9299b92..2b9fe0d 100644
--- a/pkg/defaultservice/service.go
+++ b/pkg/defaultservice/service.go
@@ -34,7 +34,8 @@ import (
        "github.com/apache/incubator-milagro-dta/libs/transport"
        "github.com/apache/incubator-milagro-dta/pkg/api"
        "github.com/apache/incubator-milagro-dta/pkg/common"
-       "github.com/apache/incubator-milagro-dta/pkg/config"
+       "github.com/apache/incubator-milagro-dta/pkg/identity"
+       "github.com/apache/incubator-milagro-dta/pkg/tendermint"
        "github.com/hokaccha/go-prettyjson"
 )
 
@@ -52,6 +53,7 @@ type Service struct {
        Store                 *datastore.Store
        KeyStore              keystore.Store
        Ipfs                  ipfs.Connector
+       Tendermint            *tendermint.NodeConnector
        nodeID                string
        masterFiduciaryNodeID string
 }
@@ -115,7 +117,12 @@ func (s *Service) Dump(tx *api.BlockChainTX) error {
                return err
        }
 
-       _, _, _, sikeSK, err := common.RetrieveIdentitySecrets(s.Store, nodeID)
+       // SIKE and BLS keys
+       keyseed, err := s.KeyStore.Get("seed")
+       if err != nil {
+               return err
+       }
+       _, sikeSK, err := identity.GenerateSIKEKeys(keyseed)
        if err != nil {
                return err
        }
@@ -127,6 +134,8 @@ func (s *Service) Dump(tx *api.BlockChainTX) error {
        fmt.Println(string(pp))
 
        return nil
+}
+
 // Endpoints for extending the plugin endpoints
 func (s *Service) Endpoints() (namespace string, endpoints 
transport.HTTPEndpoints) {
        return s.Name(), nil
diff --git a/pkg/tendermint/cmd/cmd b/pkg/tendermint/cmd/cmd
deleted file mode 100755
index 0e07c4f..0000000
Binary files a/pkg/tendermint/cmd/cmd and /dev/null differ
diff --git a/pkg/tendermint/config.go b/pkg/tendermint/config.go
deleted file mode 100644
index 510189d..0000000
--- a/pkg/tendermint/config.go
+++ /dev/null
@@ -1,6 +0,0 @@
-package tendermint
-
-const (
-       //node = "127.0.0.1:26657"
-       node = "34.246.173.153:26657"
-)
diff --git a/pkg/tendermint/connector.go b/pkg/tendermint/connector.go
new file mode 100644
index 0000000..0dab29e
--- /dev/null
+++ b/pkg/tendermint/connector.go
@@ -0,0 +1,245 @@
+package tendermint
+
+import (
+       "context"
+       "encoding/base64"
+       "encoding/json"
+       "fmt"
+       "io/ioutil"
+       "net/http"
+       "strconv"
+       "strings"
+       "time"
+
+       "github.com/apache/incubator-milagro-dta/libs/datastore"
+       "github.com/apache/incubator-milagro-dta/libs/logger"
+       "github.com/apache/incubator-milagro-dta/pkg/api"
+       status "github.com/apache/incubator-milagro-dta/pkg/tendermint/status"
+       "github.com/pkg/errors"
+       tmclient "github.com/tendermint/tendermint/rpc/client"
+       tmtypes "github.com/tendermint/tendermint/types"
+)
+
+const (
+       nodeConnectionTimeout = time.Second * 10
+       txChanSize            = 1000
+)
+
+// ProcessTXFunc is executed on each incoming TX
+type ProcessTXFunc func(tx *api.BlockChainTX) error
+
+// NodeConnector is using external tendermint node to post and get transactions
+type NodeConnector struct {
+       nodeID     string
+       tmNodeAddr string
+       httpClient *http.Client
+       tmClient   *tmclient.HTTP
+       log        *logger.Logger
+       store      *datastore.Store
+}
+
+// NewNodeConnector constructs a new Tendermint NodeConnector
+func NewNodeConnector(tmNodeAddr string, nodeID string, store 
*datastore.Store, log *logger.Logger) (conn *NodeConnector, err error) {
+       defer func() {
+               if r := recover(); r != nil {
+                       err = errors.Errorf("Initialize tendermint node 
connector: %v", r)
+               }
+       }()
+
+       tmNodeAddr = strings.TrimRight(tmNodeAddr, "/")
+       tmClient := tmclient.NewHTTP(fmt.Sprintf("tcp://%s", tmNodeAddr), 
"/websocket")
+       if err := tmClient.Start(); err != nil {
+               return nil, errors.Wrap(err, "Start tendermint client")
+       }
+
+       return &NodeConnector{
+               tmNodeAddr: tmNodeAddr,
+               nodeID:     nodeID,
+               log:        log,
+               store:      store,
+               httpClient: &http.Client{
+                       Timeout: nodeConnectionTimeout,
+               },
+               tmClient: tmClient,
+       }, nil
+
+}
+
+// Stop is performing clean-up
+func (nc *NodeConnector) Stop() error {
+       return nc.tmClient.Stop()
+}
+
+// GetTx retreives a transaction by hash
+func (nc *NodeConnector) GetTx(txHash string) (*api.BlockChainTX, error) {
+       query := fmt.Sprintf("tag.txhash='%s'", txHash)
+       result, err := nc.tmClient.TxSearch(query, true, 1, 1)
+       if err != nil {
+               return nil, err
+       }
+       if len(result.Txs) == 0 {
+               return nil, errors.New("Transaction not found")
+       }
+
+       payload := &api.BlockChainTX{}
+       if err := json.Unmarshal(result.Txs[0].Tx, &payload); err != nil {
+               return nil, err
+       }
+
+       return payload, nil
+}
+
+// PostTx posts a transaction to the chain and returns the transaction ID
+func (nc *NodeConnector) PostTx(tx *api.BlockChainTX, method string) (txID 
string, err error) {
+       txID = tx.CalcHash()
+
+       //serialize the whole transaction
+       serializedTX, err := json.Marshal(tx)
+       if err != nil {
+               return
+       }
+       base64EncodedTX := base64.StdEncoding.EncodeToString(serializedTX)
+
+       // TODO: use net/rpc
+       body := strings.NewReader(`{
+               "jsonrpc": "2.0",
+               "id": "anything",
+               "method": "broadcast_tx_commit",
+               "params": {
+                       "tx": "` + base64EncodedTX + `"}
+       }`)
+       url := "http://"; + nc.tmNodeAddr
+
+       req, err := http.NewRequest("POST", url, body)
+       if err != nil {
+               return "", errors.Wrap(err, "post to blockchain node")
+       }
+       req.Header.Set("Content-Type", "text/plain;")
+
+       resp, err := nc.httpClient.Do(req)
+       if err != nil {
+               return "", errors.Wrap(err, "post to blockchain node")
+       }
+       defer resp.Body.Close()
+
+       if resp.StatusCode != http.StatusOK {
+               var respErr string
+               if b, err := ioutil.ReadAll(resp.Body); err != nil {
+                       respErr = resp.Status
+               } else {
+                       respErr = string(b)
+               }
+
+               return "", errors.Errorf("Post to blockchain node status %v: 
%v", resp.StatusCode, respErr)
+       }
+
+       nc.log.Debug("POST TO CHAIN: METHOD: %s CALLS: %s  - TXID: %s", method, 
tx.Processor, txID)
+
+       return
+}
+
+// Subscribe connects to the Tendermint node and collect the events
+func (nc *NodeConnector) Subscribe(ctx context.Context, processFn 
ProcessTXFunc) error {
+       chainStatus, err := nc.getChainStatus()
+       if err != nil {
+               return err
+       }
+
+       currentBlockHeight, err := 
strconv.Atoi(chainStatus.Result.SyncInfo.LatestBlockHeight)
+       if err != nil {
+               return errors.Wrap(err, "Failed to obtain latest blockheight of 
Blockchain")
+       }
+
+       var processedToHeight int
+       if err := nc.store.Get("chain", "height", &processedToHeight); err != 
nil {
+               if err != datastore.ErrKeyNotFound {
+                       return errors.Wrap(err, "Get last processed block 
height")
+               }
+       }
+
+       nc.log.Debug("Block height: Current: %v; Processed: %v", 
currentBlockHeight, processedToHeight)
+
+       // create the transaction queue chan
+       txQueue := make(chan *api.BlockChainTX, txChanSize)
+
+       // Collect events
+       if err := nc.subscribeAndQueue(ctx, txQueue); err != nil {
+               return err
+       }
+
+       // TODO: load historicTX
+
+       // Process events
+       return nc.processTXQueue(ctx, txQueue, processFn)
+}
+
+func (nc *NodeConnector) subscribeAndQueue(ctx context.Context, txQueue chan 
*api.BlockChainTX) error {
+       query := "tag.recipient='" + nc.nodeID + "'"
+
+       out, err := nc.tmClient.Subscribe(context.Background(), "test", query, 
1000)
+       if err != nil {
+               return errors.Wrapf(err, "Failed to subscribe to query %s", 
query)
+       }
+
+       go func() {
+               for {
+                       select {
+                       case result := <-out:
+                               tx := result.Data.(tmtypes.EventDataTx).Tx
+                               payload := &api.BlockChainTX{}
+                               err := json.Unmarshal(tx, payload)
+                               if err != nil {
+                                       nc.log.Debug("IGNORED TX - Invalid!")
+                                       break
+                               }
+
+                               //check if this node is in receipient list
+                               if payload.RecipientID != nc.nodeID {
+                                       nc.log.Debug("IGNORED TX! Recipient not 
match the query! (%v != %v)", payload.RecipientID, nc.nodeID)
+                                       break
+                               }
+
+                               //Add into the waitingQueue for later processing
+                               txQueue <- payload
+                       case <-ctx.Done():
+                               return
+                       }
+               }
+       }()
+
+       return nil
+}
+
+func (nc *NodeConnector) processTXQueue(ctx context.Context, txQueue chan 
*api.BlockChainTX, processFn ProcessTXFunc) error {
+       for {
+               select {
+               case tx := <-txQueue:
+                       if err := processFn(tx); err != nil {
+                               // TODO: errors block processing the queue
+                               return err
+                       }
+                       // TODO: store the last block height
+               case <-ctx.Done():
+                       return nil
+               }
+       }
+}
+
+func (nc *NodeConnector) getChainStatus() (*status.StatusResponse, error) {
+       url := fmt.Sprintf("http://%s/status";, nc.tmNodeAddr)
+       resp, err := nc.httpClient.Get(url)
+       if err != nil {
+               return nil, errors.Wrap(err, "Get node status")
+       }
+       defer resp.Body.Close()
+       if resp.StatusCode != http.StatusOK {
+               return nil, errors.Errorf("Get node status status code: %v", 
resp.StatusCode)
+       }
+
+       status := &status.StatusResponse{}
+       if err := json.NewDecoder(resp.Body).Decode((&status)); err != nil {
+               return nil, errors.Wrap(err, "Invalid node status response")
+       }
+
+       return status, nil
+}
diff --git a/pkg/tendermint/tendermint.go b/pkg/tendermint/tendermint.go
deleted file mode 100644
index e02b3c1..0000000
--- a/pkg/tendermint/tendermint.go
+++ /dev/null
@@ -1,165 +0,0 @@
-package tendermint
-
-import (
-       "bufio"
-       "crypto/sha256"
-       "encoding/base64"
-       "encoding/hex"
-       "encoding/json"
-       "errors"
-       "fmt"
-       "net/http"
-       "strings"
-
-       "github.com/apache/incubator-milagro-dta/pkg/api"
-       "github.com/apache/incubator-milagro-dta/pkg/service"
-)
-
-//QueryChain the blockchain for an index
-func QueryChain(index string) (string, string) {
-       url := "http://"; + node + "/abci_query?data=\"" + index + "\""
-       resp, err := http.Get(url)
-       if err != nil {
-               // handle err
-       }
-       defer resp.Body.Close()
-       scanner := bufio.NewScanner(resp.Body)
-       scanner.Split(bufio.ScanBytes)
-       t := ""
-       for scanner.Scan() {
-               t += scanner.Text()
-               ///fmt.Print(scanner.Text())
-       }
-
-       res, _ := UnmarshalChainQuery([]byte(t))
-
-       val := res.Result.Response.Value
-       decodeVal, _ := base64.StdEncoding.DecodeString(val)
-       return string(decodeVal), val
-}
-
-//PostToChain - send TX data to the Blockchain
-func PostToChain(tx *api.BlockChainTX, method string) (string, error) {
-       //Create TX Hash
-
-       tx.RecipientID = tx.RecipientID
-
-       TXID := sha256.Sum256(tx.Payload)
-       TXIDhex := hex.EncodeToString(TXID[:])
-       tx.TXhash = TXID[:]
-
-       //serialize the whole transaction
-       serializedTX, _ := json.Marshal(tx)
-       base64EncodedTX := base64.StdEncoding.EncodeToString(serializedTX)
-
-       body := 
strings.NewReader("{\"jsonrpc\":\"2.0\",\"id\":\"anything\",\"method\":\"broadcast_tx_commit\",\"params\":
 {\"tx\": \"" + base64EncodedTX + "\"}}")
-       url := "http://"; + node + ""
-
-       req, err := http.NewRequest("POST", url, body)
-       if err != nil {
-               print("Error posting to Blockchain")
-               return "", err
-       }
-       req.Header.Set("Content-Type", "text/plain;")
-
-       resp, err := http.DefaultClient.Do(req)
-       if err != nil {
-               print("Error posting to Blockchain")
-               return "", err
-       }
-       defer resp.Body.Close()
-       fmt.Printf("POST TO CHAIN: METHOD:%s CALLS:%s  - TXID:%s\n", method, 
tx.Processor, TXIDhex)
-       return TXIDhex, nil
-}
-
-//HandleChainTX -
-func HandleChainTX(myID string, tx string) error {
-       blockChainTX, err := decodeChainTX(tx)
-       if err != nil {
-               return err
-       }
-       panic(nil)
-       err = callNextTX(nil, blockChainTX, "5556")
-       if err != nil {
-               return err
-       }
-       return nil
-}
-
-//DecodeChainTX - Decode the On Chain TX into a BlockChainTX object
-func decodeChainTX(payload string) (*api.BlockChainTX, error) {
-       base64DecodedTX, _ := base64.StdEncoding.DecodeString(payload)
-       tx := &api.BlockChainTX{}
-
-       err := json.Unmarshal(base64DecodedTX, tx)
-       if err != nil {
-               return &api.BlockChainTX{}, err
-       }
-       return tx, nil
-}
-
-//DecodeChainTX - Decode the On Chain TX into a BlockChainTX object
-func decodeTX(payload string) (*api.BlockChainTX, string, error) {
-       tx := &api.BlockChainTX{}
-       parts := strings.SplitN(payload, "=", 2)
-       if len(parts) != 2 {
-               return &api.BlockChainTX{}, "", errors.New("Invalid TX payload")
-       }
-       hash := string(parts[0])
-       err := json.Unmarshal([]byte(parts[1]), tx)
-       if err != nil {
-               return &api.BlockChainTX{}, "", err
-       }
-       return tx, hash, nil
-}
-
-func callNextTX(svc service.Service, tx *api.BlockChainTX, listenPort string) 
error {
-       switch tx.Processor {
-       case "none":
-               return nil
-       case "dump":
-               svc.Dump(tx)
-       case "v1/fulfill/order":
-               svc.FulfillOrder(tx)
-       case "v1/order2":
-               svc.Order2(tx)
-       case "v1/fulfill/order/secret":
-               svc.FulfillOrderSecret(tx)
-       case "v1/order/secret2":
-               svc.OrderSecret2(tx)
-
-       default:
-               return errors.New("Unknown processor")
-       }
-       return nil
-}
-
-//DumpTXID -
-func DumpTXID(txid string) {
-       value, raw := QueryChain(txid)
-       println(value)
-       bc, _ := decodeChainTX(raw)
-       println(string(bc.Payload))
-       println()
-}
-
-//ProcessTransactionID -
-func ProcessTransactionID(txid string) {
-       _, payload := QueryChain((txid))
-       err := HandleChainTX("", payload)
-       if err != nil {
-               panic(err)
-       }
-}
-
-func unique(stringSlice []string) []string {
-       keys := make(map[string]bool)
-       list := []string{}
-       for _, entry := range stringSlice {
-               if _, value := keys[entry]; !value {
-                       keys[entry] = true
-                       list = append(list, entry)
-               }
-       }
-       return list
-}
diff --git a/pkg/tendermint/tendermint_test.go 
b/pkg/tendermint/tendermint_test.go
deleted file mode 100644
index 495f74f..0000000
--- a/pkg/tendermint/tendermint_test.go
+++ /dev/null
@@ -1,60 +0,0 @@
-package tendermint
-
-import "testing"
-
-var (
-       nodeID = "QmT4y4MtV5mvPHkFjfUQYQ7h1WvAagMy2GTJCn2bF8DQb7"
-)
-
-func Test_Order1(t *testing.T) {
-       a := 
"eyJQcm9jZXNzb3IiOiJ2MS9mdWxmaWxsL29yZGVyIiwiU2VuZGVySUQiOiJRbVQ0eTRNdFY1bXZQSGtGamZVUVlRN2gxV3ZBYWdNeTJHVEpDbjJiRjhEUWI3IiwiUmVjaXBpZW50SUQiOiJRbVQ0eTRNdFY1bXZQSGtGamZVUVlRN2gxV3ZBYWdNeTJHVEpDbjJiRjhEUWI3IiwiUGF5bG9hZCI6ImV5SnZjbVJsY2xCaGNuUXhRMGxFSWpvaVVXMVpVRU5xVEVGME1tbzVVbWhxU0U1TVkwRnVObEF5WTJseVJHWjZTRlpFWTBwMFkzbGtUVFZ5VWxoM1V5SXNJbVJ2WTNWdFpXNTBRMGxFSWpvaVVXMVVOSGswVFhSV05XMTJVRWhyUm1wbVZWRlpVVGRvTVZkMlFXRm5UWGt5UjFSS1EyNHlZa1k0UkZGaU55SjkifQ=="
-       err := HandleChainTX(nodeID, a)
-       if err != nil {
-               panic(err)
-       }
-}
-
-func Test_FullFill(t *testing.T) {
-       a := 
"eyJQcm9jZXNzb3IiOiJPUkRFUl9SRVNQT05TRSIsIlNlbmRlcklEIjoiUW1UNHk0TXRWNW12UEhrRmpmVVFZUTdoMVd2QWFnTXkyR1RKQ24yYkY4RFFiNyIsIlJlY2lwaWVudElEIjoiUW1UNHk0TXRWNW12UEhrRmpmVVFZUTdoMVd2QWFnTXkyR1RKQ24yYkY4RFFiNyIsIlBheWxvYWQiOiJleUp2Y21SbGNsQmhjblF5UTBsRUlqb2lVVzFVZUZka1ltZEdhRGxHYWpGMlJIbFhlazVCWkROVmFuRjNlVEYyTkRsRlFtVjJhRzUyTVVWdk5HVllSaUo5In0="
-       err := HandleChainTX(nodeID, a)
-       if err != nil {
-               panic(err)
-       }
-
-}
-
-func Test_DumpTXID(t *testing.T) {
-       a := "5fe5823c0d8b6d49f2ac99c90575566962ac3a14a6b2f1e7fe7ea1099b7b3bbd"
-       value, raw := QueryChain(a)
-       println(value)
-       bc, _ := decodeChainTX(raw)
-       print(string(bc.Payload))
-}
-
-//Use this to generate Order1
-//curl -s -X POST "http://localhost:5556/v1/order1"; -H "accept: */*" -H 
"Content-Type: application/json" -d 
"{\"beneficiaryIDDocumentCID\":\"\",\"extension\":{\"coin\":\"0\"}}"
-
-func Test_All(t *testing.T) {
-       
DumpTXID("dea1396bce7890f85da7dc86b4ece5c4d372886ed08948eca6a0beee36c412e0")
-
-}
-
-func Test_1(t *testing.T) {
-       txid := 
"473407b069ff917b110f38c36d5b9e5246b5ace5d82df38c5a188d5ac868cfec"
-       DumpTXID(txid)
-       ProcessTransactionID(txid)
-}
-
-func Test_2(t *testing.T) {
-       txid := 
"586bc14b15a31999571c8188241beef046d3b78a9481ecee984e7c76a1d95112"
-       DumpTXID(txid)
-       ProcessTransactionID(txid)
-}
-
-func Test_3(t *testing.T) {
-       txid := 
"5a48129fd272f2a8c57fdd96716a78c3be55a3cf811b179e82e54221d95ccbc4"
-       DumpTXID(txid)
-       ProcessTransactionID(txid)
-}
-
-//curl -s -X POST "http://localhost:5556/v1/order1"; -H "accept: */*" -H 
"Content-Type: application/json" -d 
"{\"beneficiaryIDDocumentCID\":\"\",\"extension\":{\"coin\":\"0\"}}"
diff --git a/pkg/tendermint/websockets.go b/pkg/tendermint/websockets.go
deleted file mode 100644
index 472ea95..0000000
--- a/pkg/tendermint/websockets.go
+++ /dev/null
@@ -1,204 +0,0 @@
-package tendermint
-
-import (
-       "context"
-       "encoding/json"
-       "fmt"
-       "net/http"
-       "os"
-       "os/signal"
-       "strconv"
-       "syscall"
-
-       "github.com/apache/incubator-milagro-dta/libs/datastore"
-       "github.com/apache/incubator-milagro-dta/libs/logger"
-       "github.com/apache/incubator-milagro-dta/pkg/api"
-       "github.com/apache/incubator-milagro-dta/pkg/service"
-       status "github.com/apache/incubator-milagro-dta/pkg/tendermint/status"
-       "github.com/pkg/errors"
-       tmclient "github.com/tendermint/tendermint/rpc/client"
-       ctypes "github.com/tendermint/tendermint/rpc/core/types"
-       tmtypes "github.com/tendermint/tendermint/types"
-)
-
-func catchUp(quene chan tmtypes.Tx, store *datastore.Store, logger 
*logger.Logger, nodeID string, listenPort string, height int) error {
-       print("catch up")
-       return nil
-}
-
-//Subscribe to Websocket and add to queue
-func subscribeAndQueue(queueWaiting chan api.BlockChainTX, logger 
*logger.Logger, nodeID string, listenPort string, blockchainNode string) error {
-       client := tmclient.NewHTTP("tcp://"+blockchainNode+"", "/websocket")
-       //client.SetLogger(tmlogger)
-       err := client.Start()
-       if err != nil {
-               logger.Info("Failed to start Tendermint HTTP client %s", err)
-               return err
-       }
-       defer client.Stop()
-
-       //curl 
"34.246.173.153:26657/tx_search?query=\"tag.part=4%20AND%20tag.reference='579a2864-e100-11e9-aaf4-acde48001122'\""
-       query := "tag.recipient='" + nodeID + "'"
-       //query := "tm.event = 'Tx'"
-
-       out, err := client.Subscribe(context.Background(), "test", query, 1000)
-       if err != nil {
-               logger.Info("Failed to subscribe to query %s %s", query, err)
-               return err
-       }
-
-       logger.Info("Tendermint: Connected")
-
-       quit := make(chan os.Signal, 1)
-       signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
-       for {
-               select {
-               case result := <-out:
-                       tx := result.Data.(tmtypes.EventDataTx).Tx
-                       payload := api.BlockChainTX{}
-                       err := json.Unmarshal(tx, &payload)
-                       if err != nil {
-                               logger.Info("******** Invalid TX - ignored")
-                               break
-                       }
-
-                       //check if this node is in receipient list
-                       if payload.RecipientID != nodeID {
-                               logger.Info("******** Invalid Recipient - why 
are we receiving this TX?")
-                               break
-
-                       }
-
-                       //Add into the waitingQueue for later processing
-                       queueWaiting <- payload
-                       //fmt.Printf("Incoming Transaction:%d \n", 
len(queueWaiting))
-
-               case <-quit:
-                       os.Exit(0)
-               }
-       }
-       return nil
-}
-
-func TXbyHash(TXHash string) (api.BlockChainTX, error) {
-       client := tmclient.NewHTTP("tcp://"+node+"", "/websocket")
-       query := fmt.Sprintf("tag.txhash='%s'", TXHash)
-       result, err := client.TxSearch(query, true, 1, 1)
-
-       if len(result.Txs) == 0 {
-               return api.BlockChainTX{}, errors.New("Not found")
-       }
-
-       payload := api.BlockChainTX{}
-       err = json.Unmarshal(result.Txs[0].Tx, &payload)
-
-       _ = payload
-
-       if err != nil {
-               return payload, err
-       }
-       //
-       // res := result.Txs[0]
-       // tx := res.Tx
-       return payload, nil
-
-}
-
-//loadAllHistoricTX - load the history for this node into a queue
-func loadAllHistoricTX(start int, end int, txHistory []ctypes.ResultTx, nodeID 
string, listenPort string) error {
-       //cycle through the historic transactions page by page
-       //Get all transactions that claim to be from me - check signatures
-       //Get all transactions that claim to be to me -
-
-       client := tmclient.NewHTTP("tcp://"+node+"", "/websocket")
-       currentPage := 1
-       query := fmt.Sprintf("tag.recipient='%v' AND tag.sender='%v' AND 
tx.height>%d AND tx.height<=%d", nodeID, nodeID, start, end)
-       numPerPage := 5
-
-       for {
-               result, err := client.TxSearch(query, true, currentPage, 
numPerPage)
-               if err != nil {
-                       return errors.New("Failed to query chain for 
transaction history")
-               }
-
-               for _, tx := range result.Txs {
-                       txHistory = append(txHistory, *tx)
-               }
-               if currentPage*numPerPage > result.TotalCount {
-                       break
-               }
-               currentPage++
-       }
-       parseHistory(txHistory)
-       return nil
-}
-
-func parseHistory(txHistory []ctypes.ResultTx) {
-       txCount := len(txHistory)
-
-       //loop backwards
-       for i := txCount - 1; i >= 0; i-- {
-               resTx := txHistory[i]
-               tx := resTx.Tx
-
-               //Decode TX into BlockchainTX Object
-               payload := api.BlockChainTX{}
-               err := json.Unmarshal(tx, &payload)
-               if err != nil {
-                       msg := fmt.Sprintf("Invalid Transaction Hash:%v 
Height:%v Index:% \n", resTx.Hash, resTx.Height, resTx.Index)
-                       print(msg)
-                       continue
-               }
-               //Decode BlockchainTX.payload into Protobuffer Qredo
-               // TODO:
-               // Parse the incoming TX, check sig
-               // If from self, can assume correct
-               // builds transaction chains using previous transactionHash
-               // Ensure every
-               // Check recipient/sender in tags are correct
-               //
-               _ = payload
-       }
-       print("Finished loading - but not parsing the History\n")
-}
-
-func processTXQueue(svc service.Service, queue chan api.BlockChainTX, 
listenPort string) {
-       print("Processing queue\n")
-       for payload := range queue {
-               callNextTX(svc, &payload, listenPort)
-       }
-}
-
-//Subscribe - Connect to the Tendermint websocket to collect events
-func Subscribe(svc service.Service, store *datastore.Store, logger 
*logger.Logger, nodeID string, listenPort string) error {
-
-       latestStatus, _ := getChainStatus(node)
-       currentBlockHeight, err := 
strconv.Atoi(latestStatus.Result.SyncInfo.LatestBlockHeight)
-
-       if err != nil {
-               return errors.New("Failed to obtain latest blockheight of 
Blockchain")
-       }
-
-       var processedToHeight int
-       store.Get("chain", "height", &processedToHeight)
-
-       //first catch up to Tip of chain
-       var txHistory []ctypes.ResultTx
-       queueWaiting := make(chan api.BlockChainTX, 1000)
-
-       //while we are processessing the history save all new transactions in a 
queue for later
-       go subscribeAndQueue(queueWaiting, logger, nodeID, listenPort, node)
-       loadAllHistoricTX(processedToHeight, currentBlockHeight, txHistory, 
nodeID, listenPort)
-       processTXQueue(svc, queueWaiting, listenPort)
-       return nil
-}
-
-func getChainStatus(node string) (status.StatusResponse, error) {
-       resp, err := http.Get("http://"; + node + "/status")
-       result := status.StatusResponse{}
-       if err != nil {
-               return result, err
-       }
-       json.NewDecoder(resp.Body).Decode((&result))
-       return result, nil
-}

Reply via email to