This is an automated email from the ASF dual-hosted git repository.

cmorris pushed a commit to branch splitroles-blockchain
in repository https://gitbox.apache.org/repos/asf/incubator-milagro-dta.git

commit d4cd78d36467d98b99c311da8828c1837ab57f8b
Author: Christopher Morris <[email protected]>
AuthorDate: Fri Sep 27 14:24:59 2019 +0100

    Pull history from chain
---
 pkg/tendermint/query/queryresponse.go   |  53 +++++++
 pkg/tendermint/status/statusresponse.go |  66 ++++++++
 pkg/tendermint/websockets.go            | 261 +++++++++++++++++++++++++++-----
 3 files changed, 346 insertions(+), 34 deletions(-)

diff --git a/pkg/tendermint/query/queryresponse.go 
b/pkg/tendermint/query/queryresponse.go
new file mode 100644
index 0000000..5babe6c
--- /dev/null
+++ b/pkg/tendermint/query/queryresponse.go
@@ -0,0 +1,53 @@
+package query
+
+import "encoding/json"
+
+func UnmarshalQueryResponse(data []byte) (QueryResponse, error) {
+       var r QueryResponse
+       err := json.Unmarshal(data, &r)
+       return r, err
+}
+
+func (r *QueryResponse) Marshal() ([]byte, error) {
+       return json.Marshal(r)
+}
+
+type QueryResponse struct {
+       Jsonrpc string `json:"jsonrpc"`
+       ID      string `json:"id"`
+       Result  Result `json:"result"`
+}
+
+type Result struct {
+       Txs        []Tx   `json:"txs"`
+       TotalCount string `json:"total_count"`
+}
+
+type Tx struct {
+       Hash     string   `json:"hash"`
+       Height   string   `json:"height"`
+       Index    int64    `json:"index"`
+       TxResult TxResult `json:"tx_result"`
+       Tx       string   `json:"tx"`
+}
+
+type TxResult struct {
+       Code      int64       `json:"code"`
+       Data      interface{} `json:"data"`
+       Log       string      `json:"log"`
+       Info      string      `json:"info"`
+       GasWanted string      `json:"gasWanted"`
+       GasUsed   string      `json:"gasUsed"`
+       Events    []Event     `json:"events"`
+       Codespace string      `json:"codespace"`
+}
+
+type Event struct {
+       Type       string      `json:"type"`
+       Attributes []Attribute `json:"attributes"`
+}
+
+type Attribute struct {
+       Key   string `json:"key"`
+       Value string `json:"value"`
+}
diff --git a/pkg/tendermint/status/statusresponse.go 
b/pkg/tendermint/status/statusresponse.go
new file mode 100644
index 0000000..89fa228
--- /dev/null
+++ b/pkg/tendermint/status/statusresponse.go
@@ -0,0 +1,66 @@
+package status
+
+import "encoding/json"
+
+func UnmarshalStatusResponse(data []byte) (StatusResponse, error) {
+       var r StatusResponse
+       err := json.Unmarshal(data, &r)
+       return r, err
+}
+
+func (r *StatusResponse) Marshal() ([]byte, error) {
+       return json.Marshal(r)
+}
+
+type StatusResponse struct {
+       Jsonrpc string `json:"jsonrpc"`
+       ID      string `json:"id"`
+       Result  Result `json:"result"`
+}
+
+type Result struct {
+       NodeInfo      NodeInfo      `json:"node_info"`
+       SyncInfo      SyncInfo      `json:"sync_info"`
+       ValidatorInfo ValidatorInfo `json:"validator_info"`
+}
+
+type NodeInfo struct {
+       ProtocolVersion ProtocolVersion `json:"protocol_version"`
+       ID              string          `json:"id"`
+       ListenAddr      string          `json:"listen_addr"`
+       Network         string          `json:"network"`
+       Version         string          `json:"version"`
+       Channels        string          `json:"channels"`
+       Moniker         string          `json:"moniker"`
+       Other           Other           `json:"other"`
+}
+
+type Other struct {
+       TxIndex    string `json:"tx_index"`
+       RPCAddress string `json:"rpc_address"`
+}
+
+type ProtocolVersion struct {
+       P2P   string `json:"p2p"`
+       Block string `json:"block"`
+       App   string `json:"app"`
+}
+
+type SyncInfo struct {
+       LatestBlockHash   string `json:"latest_block_hash"`
+       LatestAppHash     string `json:"latest_app_hash"`
+       LatestBlockHeight string `json:"latest_block_height"`
+       LatestBlockTime   string `json:"latest_block_time"`
+       CatchingUp        bool   `json:"catching_up"`
+}
+
+type ValidatorInfo struct {
+       Address     string `json:"address"`
+       PubKey      PubKey `json:"pub_key"`
+       VotingPower string `json:"voting_power"`
+}
+
+type PubKey struct {
+       Type  string `json:"type"`
+       Value string `json:"value"`
+}
diff --git a/pkg/tendermint/websockets.go b/pkg/tendermint/websockets.go
index aba13de..cb724f1 100644
--- a/pkg/tendermint/websockets.go
+++ b/pkg/tendermint/websockets.go
@@ -2,30 +2,31 @@ package tendermint
 
 import (
        "context"
-       "encoding/hex"
        "encoding/json"
+       "fmt"
+       "net/http"
        "os"
        "os/signal"
+       "strconv"
        "syscall"
 
        "github.com/apache/incubator-milagro-dta/libs/datastore"
        "github.com/apache/incubator-milagro-dta/libs/logger"
        "github.com/apache/incubator-milagro-dta/pkg/api"
+       status "github.com/apache/incubator-milagro-dta/pkg/tendermint/status"
+       "github.com/pkg/errors"
        tmclient "github.com/tendermint/tendermint/rpc/client"
+       ctypes "github.com/tendermint/tendermint/rpc/core/types"
        tmtypes "github.com/tendermint/tendermint/types"
 )
 
-func catchUp(store *datastore.Store, logger *logger.Logger, nodeID string, 
listenPort string) error {
-
+func catchUp(quene chan tmtypes.Tx, store *datastore.Store, logger 
*logger.Logger, nodeID string, listenPort string, height int) error {
+       print("catch up")
        return nil
 }
 
-//Subscribe - Connect to the Tendermint websocket to collect events
-func Subscribe(store *datastore.Store, logger *logger.Logger, nodeID string, 
listenPort string) error {
-
-       //first catch up to Tip of chain
-       //catchUp(store, logger, nodeID, listenPort)
-
+//Subscribe to Websocket and add to queue
+func subscribeAndQueue(queueWaiting chan api.BlockChainTX, logger 
*logger.Logger, nodeID string, listenPort string) error {
        client := tmclient.NewHTTP("tcp://"+node+"", "/websocket")
        //client.SetLogger(tmlogger)
        err := client.Start()
@@ -49,7 +50,6 @@ func Subscribe(store *datastore.Store, logger *logger.Logger, 
nodeID string, lis
 
        quit := make(chan os.Signal, 1)
        signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
-
        for {
                select {
                case result := <-out:
@@ -61,12 +61,12 @@ func Subscribe(store *datastore.Store, logger 
*logger.Logger, nodeID string, lis
                                break
                        }
 
-                       //check if this node is Sender
+                       //check if this node is Sender - if so we don't need to 
process it
                        if payload.SenderID == nodeID {
                                break
                        }
 
-                       //check is receipient
+                       //check if this node is in receipient list
                        isRecipient := false
                        for _, v := range payload.RecipientID {
                                if v == nodeID {
@@ -75,41 +75,224 @@ func Subscribe(store *datastore.Store, logger 
*logger.Logger, nodeID string, lis
                                }
                        }
 
+                       //If not in recipient list do nothing
                        if isRecipient == false {
                                logger.Info("******** Invalid Recipient - why 
are we receiving this TX?")
                                break
                        }
 
-                       //blockchainTX, txid, err := decodeTX(string(tx))
-                       TXIDhex := hex.EncodeToString(payload.TXhash[:])
-                       logger.Info("Incoming TXHash:%s . Processor:%s", 
TXIDhex, payload.Processor)
+                       //Add into the waitingQueue for later processing
+                       queueWaiting <- payload
+                       fmt.Printf("Incoming Transaction:%d \n", 
len(queueWaiting))
 
-                       if payload.Processor == "NONE" {
-                               DumpTX(&payload)
-                       } else {
-                               callNextTX(&payload, listenPort)
-                       }
+                       // //blockchainTX, txid, err := decodeTX(string(tx))
+                       // TXIDhex := hex.EncodeToString(payload.TXhash[:])
+                       // logger.Info("Incoming TXHash:%s . Processor:%s", 
TXIDhex, payload.Processor)
 
-                       //print(blockchainTX)
-                       // print(txid)
+                       // if payload.Processor == "NONE" {
+                       //      DumpTX(&payload)
+                       // } else {
+                       //      callNextTX(&payload, listenPort)
+                       // }
 
-                       // print(string(xx))
+               case <-quit:
+                       os.Exit(0)
+               }
+       }
+       return nil
+}
 
-                       // a := result.Data.(tmtypes.EventDataTx).Index
-                       // b := result.Data.(tmtypes.EventDataTx)
-                       // c := b.TxResult
-                       // tx := c.Tx
-                       // txdata := []byte(c.Tx)
-                       // print(string(txdata))
+//loadAllHistoricTX - load the history for this node into a queue
+func loadAllHistoricTX(start int, end int, txHistory []ctypes.ResultTx, nodeID 
string, listenPort string) error {
+       //cycle through the historic transactions page by page
+       //Get all transactions that claim to be from me - check signatures
+       //Get all transactions that claim to be to me -
 
-                       // print(a)
-                       // Use(c, b, tx)
+       client := tmclient.NewHTTP("tcp://"+node+"", "/websocket")
+       currentPage := 1
+       query := fmt.Sprintf("tag.recipient='%v' AND tag.sender='%v' AND 
tx.height>%d AND tx.height<=%d", nodeID, nodeID, start, end)
+       numPerPage := 5
 
-                       //logger.Info("got tx","index", 
result.Data.(tmtypes.EventDataTx).Index)
-               case <-quit:
-                       os.Exit(0)
+       for {
+               result, err := client.TxSearch(query, true, currentPage, 
numPerPage)
+               if err != nil {
+                       return errors.New("Failed to query chain for 
transaction history")
                }
+
+               for _, tx := range result.Txs {
+                       txHistory = append(txHistory, *tx)
+               }
+               if currentPage*numPerPage > result.TotalCount {
+                       break
+               }
+               currentPage++
+       }
+       parseHistory(txHistory)
+       return nil
+}
+
+func parseHistory(txHistory []ctypes.ResultTx) {
+       txCount := len(txHistory)
+
+       //loop backwards
+       for i := txCount - 1; i >= 0; i-- {
+               resTx := txHistory[i]
+               tx := resTx.Tx
+
+               //Decode TX into BlockchainTX Object
+               payload := api.BlockChainTX{}
+               err := json.Unmarshal(tx, &payload)
+               if err != nil {
+                       msg := fmt.Sprintf("Invalid Transaction Hash:%v 
Height:%v Index:% \n", resTx.Hash, resTx.Height, resTx.Index)
+                       print(msg)
+                       continue
+               }
+               //Decode BlockchainTX.payload into Protobuffer Qredo
+               // TODO:
+               // Parse the incoming TX, check sig
+               // If from self, can assume correct
+               // builds transaction chains using previous transactionHash
+               // Ensure every
+               // Check recipient/sender in tags are correct
+               //
+               _ = payload
        }
+       print("Finished loading - but not parsing the History\n")
+}
+
+func processTXQueue(queue chan api.BlockChainTX, listenPort string) {
+       print("Processing queue\n")
+       for payload := range queue {
+               //blockchainTX, txid, err := decodeTX(string(tx))
+               //TXIDhex := hex.EncodeToString(payload.TXhash[:])
+               //      logger.Info("Incoming TXHash:%s . Processor:%s", 
TXIDhex, payload.Processor)
+
+               if payload.Processor == "NONE" {
+                       DumpTX(&payload)
+               } else {
+                       callNextTX(&payload, listenPort)
+               }
+       }
+}
+
+//Subscribe - Connect to the Tendermint websocket to collect events
+func Subscribe(store *datastore.Store, logger *logger.Logger, nodeID string, 
listenPort string) error {
+       //Subscribe to channel
+       //Get height
+
+       latestStatus, _ := getChainStatus(node)
+       currentBlockHeight, err := 
strconv.Atoi(latestStatus.Result.SyncInfo.LatestBlockHeight)
+
+       if err != nil {
+               return errors.New("Failed to obtain latest blockheight of 
Blockchain")
+       }
+
+       var processedToHeight int
+       store.Get("chain", "height", &processedToHeight)
+
+       //first catch up to Tip of chain
+       var txHistory []ctypes.ResultTx
+       queueWaiting := make(chan api.BlockChainTX, 1000)
+
+       //while we are processessing the history save all new transactions in a 
queue for later
+
+       go subscribeAndQueue(queueWaiting, logger, nodeID, listenPort)
+
+       loadAllHistoricTX(processedToHeight, currentBlockHeight, txHistory, 
nodeID, listenPort)
+
+       processTXQueue(queueWaiting, listenPort)
+
+       // var height int
+       // store.Get("chain", "height", &height)
+
+       // catchUp(queue, store, logger, nodeID, listenPort, height)
+       // return nil
+
+       // client := tmclient.NewHTTP("tcp://"+node+"", "/websocket")
+       // //client.SetLogger(tmlogger)
+       // err := client.Start()
+       // if err != nil {
+       //      logger.Info("Failed to start Tendermint HTTP client %s", err)
+       //      return err
+       // }
+       // defer client.Stop()
+
+       // //curl 
"34.246.173.153:26657/tx_search?query=\"tag.part=4%20AND%20tag.reference='579a2864-e100-11e9-aaf4-acde48001122'\""
+       // query := "tag.recipient='" + nodeID + "'"
+       // //query := "tm.event = 'Tx'"
+
+       // out, err := client.Subscribe(context.Background(), "test", query, 
1000)
+       // if err != nil {
+       //      logger.Info("Failed to subscribe to query %s %s", query, err)
+       //      return err
+       // }
+
+       // logger.Info("Tendermint: Connected")
+
+       // quit := make(chan os.Signal, 1)
+       // signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
+
+       // for {
+       //      select {
+       //      case result := <-out:
+       //              tx := result.Data.(tmtypes.EventDataTx).Tx
+       //              payload := api.BlockChainTX{}
+       //              err := json.Unmarshal(tx, &payload)
+       //              if err != nil {
+       //                      logger.Info("******** Invalid TX - ignored")
+       //                      break
+       //              }
+
+       //              //check if this node is Sender
+       //              if payload.SenderID == nodeID {
+       //                      break
+       //              }
+
+       //              //check is receipient
+       //              isRecipient := false
+       //              for _, v := range payload.RecipientID {
+       //                      if v == nodeID {
+       //                              isRecipient = true
+       //                              break
+       //                      }
+       //              }
+
+       //              if isRecipient == false {
+       //                      logger.Info("******** Invalid Recipient - why 
are we receiving this TX?")
+       //                      break
+       //              }
+
+       //              //blockchainTX, txid, err := decodeTX(string(tx))
+       //              TXIDhex := hex.EncodeToString(payload.TXhash[:])
+       //              logger.Info("Incoming TXHash:%s . Processor:%s", 
TXIDhex, payload.Processor)
+
+       //              if payload.Processor == "NONE" {
+       //                      DumpTX(&payload)
+       //              } else {
+       //                      callNextTX(&payload, listenPort)
+       //              }
+
+       //              //print(blockchainTX)
+       //              // print(txid)
+
+       //              // print(string(xx))
+
+       //              // a := result.Data.(tmtypes.EventDataTx).Index
+       //              // b := result.Data.(tmtypes.EventDataTx)
+       //              // c := b.TxResult
+       //              // tx := c.Tx
+       //              // txdata := []byte(c.Tx)
+       //              // print(string(txdata))
+
+       //              // print(a)
+       //              // Use(c, b, tx)
+
+       //              //logger.Info("got tx","index", 
result.Data.(tmtypes.EventDataTx).Index)
+       //      case <-quit:
+       //              os.Exit(0)
+       //      }
+       // }
+       return nil
 }
 
 //Use - helper to remove warnings
@@ -118,3 +301,13 @@ func Use(vals ...interface{}) {
                _ = val
        }
 }
+
+func getChainStatus(node string) (status.StatusResponse, error) {
+       resp, err := http.Get("http://"; + node + "/status")
+       result := status.StatusResponse{}
+       if err != nil {
+               return result, err
+       }
+       json.NewDecoder(resp.Body).Decode((&result))
+       return result, nil
+}

Reply via email to