This is an automated email from the ASF dual-hosted git repository. cmorris pushed a commit to branch splitroles-blockchain in repository https://gitbox.apache.org/repos/asf/incubator-milagro-dta.git
commit d4cd78d36467d98b99c311da8828c1837ab57f8b Author: Christopher Morris <[email protected]> AuthorDate: Fri Sep 27 14:24:59 2019 +0100 Pull history from chain --- pkg/tendermint/query/queryresponse.go | 53 +++++++ pkg/tendermint/status/statusresponse.go | 66 ++++++++ pkg/tendermint/websockets.go | 261 +++++++++++++++++++++++++++----- 3 files changed, 346 insertions(+), 34 deletions(-) diff --git a/pkg/tendermint/query/queryresponse.go b/pkg/tendermint/query/queryresponse.go new file mode 100644 index 0000000..5babe6c --- /dev/null +++ b/pkg/tendermint/query/queryresponse.go @@ -0,0 +1,53 @@ +package query + +import "encoding/json" + +func UnmarshalQueryResponse(data []byte) (QueryResponse, error) { + var r QueryResponse + err := json.Unmarshal(data, &r) + return r, err +} + +func (r *QueryResponse) Marshal() ([]byte, error) { + return json.Marshal(r) +} + +type QueryResponse struct { + Jsonrpc string `json:"jsonrpc"` + ID string `json:"id"` + Result Result `json:"result"` +} + +type Result struct { + Txs []Tx `json:"txs"` + TotalCount string `json:"total_count"` +} + +type Tx struct { + Hash string `json:"hash"` + Height string `json:"height"` + Index int64 `json:"index"` + TxResult TxResult `json:"tx_result"` + Tx string `json:"tx"` +} + +type TxResult struct { + Code int64 `json:"code"` + Data interface{} `json:"data"` + Log string `json:"log"` + Info string `json:"info"` + GasWanted string `json:"gasWanted"` + GasUsed string `json:"gasUsed"` + Events []Event `json:"events"` + Codespace string `json:"codespace"` +} + +type Event struct { + Type string `json:"type"` + Attributes []Attribute `json:"attributes"` +} + +type Attribute struct { + Key string `json:"key"` + Value string `json:"value"` +} diff --git a/pkg/tendermint/status/statusresponse.go b/pkg/tendermint/status/statusresponse.go new file mode 100644 index 0000000..89fa228 --- /dev/null +++ b/pkg/tendermint/status/statusresponse.go @@ -0,0 +1,66 @@ +package status + +import "encoding/json" + +func UnmarshalStatusResponse(data []byte) (StatusResponse, error) { + var r StatusResponse + err := json.Unmarshal(data, &r) + return r, err +} + +func (r *StatusResponse) Marshal() ([]byte, error) { + return json.Marshal(r) +} + +type StatusResponse struct { + Jsonrpc string `json:"jsonrpc"` + ID string `json:"id"` + Result Result `json:"result"` +} + +type Result struct { + NodeInfo NodeInfo `json:"node_info"` + SyncInfo SyncInfo `json:"sync_info"` + ValidatorInfo ValidatorInfo `json:"validator_info"` +} + +type NodeInfo struct { + ProtocolVersion ProtocolVersion `json:"protocol_version"` + ID string `json:"id"` + ListenAddr string `json:"listen_addr"` + Network string `json:"network"` + Version string `json:"version"` + Channels string `json:"channels"` + Moniker string `json:"moniker"` + Other Other `json:"other"` +} + +type Other struct { + TxIndex string `json:"tx_index"` + RPCAddress string `json:"rpc_address"` +} + +type ProtocolVersion struct { + P2P string `json:"p2p"` + Block string `json:"block"` + App string `json:"app"` +} + +type SyncInfo struct { + LatestBlockHash string `json:"latest_block_hash"` + LatestAppHash string `json:"latest_app_hash"` + LatestBlockHeight string `json:"latest_block_height"` + LatestBlockTime string `json:"latest_block_time"` + CatchingUp bool `json:"catching_up"` +} + +type ValidatorInfo struct { + Address string `json:"address"` + PubKey PubKey `json:"pub_key"` + VotingPower string `json:"voting_power"` +} + +type PubKey struct { + Type string `json:"type"` + Value string `json:"value"` +} diff --git a/pkg/tendermint/websockets.go b/pkg/tendermint/websockets.go index aba13de..cb724f1 100644 --- a/pkg/tendermint/websockets.go +++ b/pkg/tendermint/websockets.go @@ -2,30 +2,31 @@ package tendermint import ( "context" - "encoding/hex" "encoding/json" + "fmt" + "net/http" "os" "os/signal" + "strconv" "syscall" "github.com/apache/incubator-milagro-dta/libs/datastore" "github.com/apache/incubator-milagro-dta/libs/logger" "github.com/apache/incubator-milagro-dta/pkg/api" + status "github.com/apache/incubator-milagro-dta/pkg/tendermint/status" + "github.com/pkg/errors" tmclient "github.com/tendermint/tendermint/rpc/client" + ctypes "github.com/tendermint/tendermint/rpc/core/types" tmtypes "github.com/tendermint/tendermint/types" ) -func catchUp(store *datastore.Store, logger *logger.Logger, nodeID string, listenPort string) error { - +func catchUp(quene chan tmtypes.Tx, store *datastore.Store, logger *logger.Logger, nodeID string, listenPort string, height int) error { + print("catch up") return nil } -//Subscribe - Connect to the Tendermint websocket to collect events -func Subscribe(store *datastore.Store, logger *logger.Logger, nodeID string, listenPort string) error { - - //first catch up to Tip of chain - //catchUp(store, logger, nodeID, listenPort) - +//Subscribe to Websocket and add to queue +func subscribeAndQueue(queueWaiting chan api.BlockChainTX, logger *logger.Logger, nodeID string, listenPort string) error { client := tmclient.NewHTTP("tcp://"+node+"", "/websocket") //client.SetLogger(tmlogger) err := client.Start() @@ -49,7 +50,6 @@ func Subscribe(store *datastore.Store, logger *logger.Logger, nodeID string, lis quit := make(chan os.Signal, 1) signal.Notify(quit, os.Interrupt, syscall.SIGTERM) - for { select { case result := <-out: @@ -61,12 +61,12 @@ func Subscribe(store *datastore.Store, logger *logger.Logger, nodeID string, lis break } - //check if this node is Sender + //check if this node is Sender - if so we don't need to process it if payload.SenderID == nodeID { break } - //check is receipient + //check if this node is in receipient list isRecipient := false for _, v := range payload.RecipientID { if v == nodeID { @@ -75,41 +75,224 @@ func Subscribe(store *datastore.Store, logger *logger.Logger, nodeID string, lis } } + //If not in recipient list do nothing if isRecipient == false { logger.Info("******** Invalid Recipient - why are we receiving this TX?") break } - //blockchainTX, txid, err := decodeTX(string(tx)) - TXIDhex := hex.EncodeToString(payload.TXhash[:]) - logger.Info("Incoming TXHash:%s . Processor:%s", TXIDhex, payload.Processor) + //Add into the waitingQueue for later processing + queueWaiting <- payload + fmt.Printf("Incoming Transaction:%d \n", len(queueWaiting)) - if payload.Processor == "NONE" { - DumpTX(&payload) - } else { - callNextTX(&payload, listenPort) - } + // //blockchainTX, txid, err := decodeTX(string(tx)) + // TXIDhex := hex.EncodeToString(payload.TXhash[:]) + // logger.Info("Incoming TXHash:%s . Processor:%s", TXIDhex, payload.Processor) - //print(blockchainTX) - // print(txid) + // if payload.Processor == "NONE" { + // DumpTX(&payload) + // } else { + // callNextTX(&payload, listenPort) + // } - // print(string(xx)) + case <-quit: + os.Exit(0) + } + } + return nil +} - // a := result.Data.(tmtypes.EventDataTx).Index - // b := result.Data.(tmtypes.EventDataTx) - // c := b.TxResult - // tx := c.Tx - // txdata := []byte(c.Tx) - // print(string(txdata)) +//loadAllHistoricTX - load the history for this node into a queue +func loadAllHistoricTX(start int, end int, txHistory []ctypes.ResultTx, nodeID string, listenPort string) error { + //cycle through the historic transactions page by page + //Get all transactions that claim to be from me - check signatures + //Get all transactions that claim to be to me - - // print(a) - // Use(c, b, tx) + client := tmclient.NewHTTP("tcp://"+node+"", "/websocket") + currentPage := 1 + query := fmt.Sprintf("tag.recipient='%v' AND tag.sender='%v' AND tx.height>%d AND tx.height<=%d", nodeID, nodeID, start, end) + numPerPage := 5 - //logger.Info("got tx","index", result.Data.(tmtypes.EventDataTx).Index) - case <-quit: - os.Exit(0) + for { + result, err := client.TxSearch(query, true, currentPage, numPerPage) + if err != nil { + return errors.New("Failed to query chain for transaction history") } + + for _, tx := range result.Txs { + txHistory = append(txHistory, *tx) + } + if currentPage*numPerPage > result.TotalCount { + break + } + currentPage++ + } + parseHistory(txHistory) + return nil +} + +func parseHistory(txHistory []ctypes.ResultTx) { + txCount := len(txHistory) + + //loop backwards + for i := txCount - 1; i >= 0; i-- { + resTx := txHistory[i] + tx := resTx.Tx + + //Decode TX into BlockchainTX Object + payload := api.BlockChainTX{} + err := json.Unmarshal(tx, &payload) + if err != nil { + msg := fmt.Sprintf("Invalid Transaction Hash:%v Height:%v Index:% \n", resTx.Hash, resTx.Height, resTx.Index) + print(msg) + continue + } + //Decode BlockchainTX.payload into Protobuffer Qredo + // TODO: + // Parse the incoming TX, check sig + // If from self, can assume correct + // builds transaction chains using previous transactionHash + // Ensure every + // Check recipient/sender in tags are correct + // + _ = payload } + print("Finished loading - but not parsing the History\n") +} + +func processTXQueue(queue chan api.BlockChainTX, listenPort string) { + print("Processing queue\n") + for payload := range queue { + //blockchainTX, txid, err := decodeTX(string(tx)) + //TXIDhex := hex.EncodeToString(payload.TXhash[:]) + // logger.Info("Incoming TXHash:%s . Processor:%s", TXIDhex, payload.Processor) + + if payload.Processor == "NONE" { + DumpTX(&payload) + } else { + callNextTX(&payload, listenPort) + } + } +} + +//Subscribe - Connect to the Tendermint websocket to collect events +func Subscribe(store *datastore.Store, logger *logger.Logger, nodeID string, listenPort string) error { + //Subscribe to channel + //Get height + + latestStatus, _ := getChainStatus(node) + currentBlockHeight, err := strconv.Atoi(latestStatus.Result.SyncInfo.LatestBlockHeight) + + if err != nil { + return errors.New("Failed to obtain latest blockheight of Blockchain") + } + + var processedToHeight int + store.Get("chain", "height", &processedToHeight) + + //first catch up to Tip of chain + var txHistory []ctypes.ResultTx + queueWaiting := make(chan api.BlockChainTX, 1000) + + //while we are processessing the history save all new transactions in a queue for later + + go subscribeAndQueue(queueWaiting, logger, nodeID, listenPort) + + loadAllHistoricTX(processedToHeight, currentBlockHeight, txHistory, nodeID, listenPort) + + processTXQueue(queueWaiting, listenPort) + + // var height int + // store.Get("chain", "height", &height) + + // catchUp(queue, store, logger, nodeID, listenPort, height) + // return nil + + // client := tmclient.NewHTTP("tcp://"+node+"", "/websocket") + // //client.SetLogger(tmlogger) + // err := client.Start() + // if err != nil { + // logger.Info("Failed to start Tendermint HTTP client %s", err) + // return err + // } + // defer client.Stop() + + // //curl "34.246.173.153:26657/tx_search?query=\"tag.part=4%20AND%20tag.reference='579a2864-e100-11e9-aaf4-acde48001122'\"" + // query := "tag.recipient='" + nodeID + "'" + // //query := "tm.event = 'Tx'" + + // out, err := client.Subscribe(context.Background(), "test", query, 1000) + // if err != nil { + // logger.Info("Failed to subscribe to query %s %s", query, err) + // return err + // } + + // logger.Info("Tendermint: Connected") + + // quit := make(chan os.Signal, 1) + // signal.Notify(quit, os.Interrupt, syscall.SIGTERM) + + // for { + // select { + // case result := <-out: + // tx := result.Data.(tmtypes.EventDataTx).Tx + // payload := api.BlockChainTX{} + // err := json.Unmarshal(tx, &payload) + // if err != nil { + // logger.Info("******** Invalid TX - ignored") + // break + // } + + // //check if this node is Sender + // if payload.SenderID == nodeID { + // break + // } + + // //check is receipient + // isRecipient := false + // for _, v := range payload.RecipientID { + // if v == nodeID { + // isRecipient = true + // break + // } + // } + + // if isRecipient == false { + // logger.Info("******** Invalid Recipient - why are we receiving this TX?") + // break + // } + + // //blockchainTX, txid, err := decodeTX(string(tx)) + // TXIDhex := hex.EncodeToString(payload.TXhash[:]) + // logger.Info("Incoming TXHash:%s . Processor:%s", TXIDhex, payload.Processor) + + // if payload.Processor == "NONE" { + // DumpTX(&payload) + // } else { + // callNextTX(&payload, listenPort) + // } + + // //print(blockchainTX) + // // print(txid) + + // // print(string(xx)) + + // // a := result.Data.(tmtypes.EventDataTx).Index + // // b := result.Data.(tmtypes.EventDataTx) + // // c := b.TxResult + // // tx := c.Tx + // // txdata := []byte(c.Tx) + // // print(string(txdata)) + + // // print(a) + // // Use(c, b, tx) + + // //logger.Info("got tx","index", result.Data.(tmtypes.EventDataTx).Index) + // case <-quit: + // os.Exit(0) + // } + // } + return nil } //Use - helper to remove warnings @@ -118,3 +301,13 @@ func Use(vals ...interface{}) { _ = val } } + +func getChainStatus(node string) (status.StatusResponse, error) { + resp, err := http.Get("http://" + node + "/status") + result := status.StatusResponse{} + if err != nil { + return result, err + } + json.NewDecoder(resp.Body).Decode((&result)) + return result, nil +}
