This is an automated email from the ASF dual-hosted git repository. cmorris pushed a commit to branch splitroles-blockchain in repository https://gitbox.apache.org/repos/asf/incubator-milagro-dta.git
commit 27d062afb7fe6a046fa26f707caffb8eac09e29f Author: Christopher Morris <[email protected]> AuthorDate: Fri Sep 6 11:08:14 2019 +0100 Working Order --- cmd/service/__debug_bin | Bin 0 -> 51482340 bytes cmd/service/main.go | 48 +++------------------ go.mod | 1 + go.sum | 3 ++ pkg/defaultservice/order.go | 2 - pkg/tendermint/tendermint.go | 47 ++++++++++++++++---- pkg/tendermint/tendermint_test.go | 3 +- pkg/tendermint/websockets.go | 88 ++++++++++++++++++++++++++++++++++++++ 8 files changed, 138 insertions(+), 54 deletions(-) diff --git a/cmd/service/__debug_bin b/cmd/service/__debug_bin new file mode 100755 index 0000000..c1d3957 Binary files /dev/null and b/cmd/service/__debug_bin differ diff --git a/cmd/service/main.go b/cmd/service/main.go index e177b38..877c6c5 100644 --- a/cmd/service/main.go +++ b/cmd/service/main.go @@ -21,7 +21,6 @@ Package main - handles config, initialisation and starts the service daemon package main import ( - "context" "crypto/rand" "fmt" "net/http" @@ -38,16 +37,12 @@ import ( "github.com/apache/incubator-milagro-dta/pkg/api" "github.com/apache/incubator-milagro-dta/pkg/config" "github.com/apache/incubator-milagro-dta/pkg/endpoints" + "github.com/apache/incubator-milagro-dta/pkg/tendermint" "github.com/apache/incubator-milagro-dta/plugins" "github.com/go-kit/kit/metrics/prometheus" "github.com/pkg/errors" stdprometheus "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" - - log2 "github.com/tendermint/tendermint/libs/log" - tmclient "github.com/tendermint/tendermint/rpc/client" - tmtypes "github.com/tendermint/tendermint/types" - "github.com/tendermint/tmlibs/log" ) func initConfig(args []string) error { @@ -151,6 +146,11 @@ func startDaemon(args []string) error { return errors.Wrap(err, "init logger") } + go tendermint.Subscribe(logger) + if err != nil { + return errors.Wrap(err, "init Tendermint Blockchain") + } + // Create KV store logger.Info("Datastore type: %s", cfg.Node.Datastore) store, err := initDataStore(cfg.Node.Datastore) @@ -284,7 +284,7 @@ func initDataStore(ds string) (*datastore.Store, error) { func main() { var err error - go listenTendermint() + cmd, args := parseCommand() switch cmd { default: @@ -300,37 +300,3 @@ func main() { os.Exit(1) } } - -func connectTendermintWS() { - - logger := log2.NewTMLogger(log.NewSyncWriter(os.Stdout)) - - client := tmclient.NewHTTP("tcp://localhost:26657", "/websocket") - client.SetLogger(logger) - err := client.Start() - if err != nil { - logger.Error("Failed to start a client", "err", err) - os.Exit(1) - } - defer client.Stop() - - query := "tm.event = 'Tx'" - out, err := client.Subscribe(context.Background(), "test", query, 1000) - if err != nil { - logger.Error("Failed to subscribe to query", "err", err, "query", query) - os.Exit(1) - } - - quit := make(chan os.Signal, 1) - signal.Notify(quit, os.Interrupt, syscall.SIGTERM) - - for { - select { - case result := <-out: - logger.Info("got tx", - "index", result.Data.(tmtypes.EventDataTx).Index) - case <-quit: - os.Exit(0) - } - } -} diff --git a/go.mod b/go.mod index b0e50bd..44503b6 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,7 @@ module github.com/apache/incubator-milagro-dta require ( + github.com/TylerBrock/colorjson v0.0.0-20180527164720-95ec53f28296 github.com/btcsuite/btcd v0.0.0-20190427004231-96897255fd17 github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d github.com/coreos/go-oidc v2.0.0+incompatible diff --git a/go.sum b/go.sum index dccddba..94d113f 100644 --- a/go.sum +++ b/go.sum @@ -13,6 +13,8 @@ github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrU github.com/Stebalien/go-bitfield v0.0.0-20180330043415-076a62f9ce6e/go.mod h1:3oM7gXIttpYDAJXpVNnSCiUMYBLIZ6cb1t+Ip982MRo= github.com/Stebalien/go-bitfield v0.0.1 h1:X3kbSSPUaJK60wV2hjOPZwmpljr6VGCqdq4cBLhbQBo= github.com/Stebalien/go-bitfield v0.0.1/go.mod h1:GNjFpasyUVkHMsfEOk8EFLJ9syQ6SI+XWrX9Wf2XH0s= +github.com/TylerBrock/colorjson v0.0.0-20180527164720-95ec53f28296 h1:JYWTroLXcNzSCgu66NMgdjwoMHQRbv2SoOVNFb4kRkE= +github.com/TylerBrock/colorjson v0.0.0-20180527164720-95ec53f28296/go.mod h1:VSw57q4QFiWDbRnjdX8Cb3Ow0SFncRw+bA/ofY6Q83w= github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -72,6 +74,7 @@ github.com/elgris/jsondiff v0.0.0-20160530203242-765b5c24c302/go.mod h1:qBlWZqWe github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw= github.com/facebookgo/atomicfile v0.0.0-20151019160806-2de1f203e7d5/go.mod h1:JpoxHjuQauoxiFMl1ie8Xc/7TfLuMZ5eOCONd1sUBHg= github.com/fatih/color v1.6.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= +github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fd/go-nat v1.0.0/go.mod h1:BTBu/CKvMmOMUPkKVef1pngt2WFH/lg7E6yQnulfp6E= github.com/fortytw2/leaktest v1.2.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= diff --git a/pkg/defaultservice/order.go b/pkg/defaultservice/order.go index 46dfb16..8bb1488 100644 --- a/pkg/defaultservice/order.go +++ b/pkg/defaultservice/order.go @@ -433,7 +433,5 @@ func (s *Service) Order2(req *api.FulfillOrderResponse) (string, error) { Payload: marshaledRequest, } //curl --data-binary '{"jsonrpc":"2.0","id":"anything","method":"broadcast_tx_commit","params": {"tx": "YWFhcT1hYWFxCg=="}}' -H 'content-type:text/plain;' http://localhost:26657 - return tendermint.PostToChain(chainTX, "Order2") - } diff --git a/pkg/tendermint/tendermint.go b/pkg/tendermint/tendermint.go index 286f1da..5427ba1 100644 --- a/pkg/tendermint/tendermint.go +++ b/pkg/tendermint/tendermint.go @@ -6,11 +6,13 @@ import ( "encoding/base64" "encoding/hex" "encoding/json" + "errors" "fmt" "net/http" "os" "strings" + "github.com/TylerBrock/colorjson" "github.com/apache/incubator-milagro-dta/pkg/api" ) @@ -44,7 +46,7 @@ func PostToChain(payload *api.BlockChainTX, method string) (string, error) { TXIDhex := hex.EncodeToString(TXID[:]) fullTx := fmt.Sprintf("%s=%s", TXIDhex, string(serializedTX)) - fmt.Printf(" **** %s Block TX: %s\n", method, TXIDhex) + //fmt.Printf(" **** %s Block TX: %s\n", method, TXIDhex) base64EncodedTX := base64.StdEncoding.EncodeToString([]byte(fullTx)) body := strings.NewReader("{\"jsonrpc\":\"2.0\",\"id\":\"anything\",\"method\":\"broadcast_tx_commit\",\"params\": {\"tx\": \"" + base64EncodedTX + "\"}}") req, err := http.NewRequest("POST", "http://localhost:26657", body) @@ -69,12 +71,6 @@ func HandleChainTX(myID string, tx string) error { if err != nil { return err } - - if blockChainTX.Processor == "NONE" { - print("Process Complete") - print(string(blockChainTX.Payload)) - return nil - } err = callNextTX(blockChainTX) if err != nil { return err @@ -94,12 +90,32 @@ func decodeChainTX(payload string) (*api.BlockChainTX, error) { return tx, nil } +//DecodeChainTX - Decode the On Chain TX into a BlockChainTX object +func decodeTX(payload string) (*api.BlockChainTX, string, error) { + tx := &api.BlockChainTX{} + parts := strings.SplitN(payload, "=", 2) + if len(parts) != 2 { + return &api.BlockChainTX{}, "", errors.New("Invalid TX payload") + } + hash := string(parts[0]) + err := json.Unmarshal([]byte(parts[1]), tx) + if err != nil { + return &api.BlockChainTX{}, "", err + } + return tx, hash, nil +} + func callNextTX(tx *api.BlockChainTX) error { // recipient := tx.RecipientID // sender := tx.SenderID //payloadJSON := tx.Payload payloadString := string(tx.Payload) + if tx.Processor == "NONE" { + //The TX is information and doesn't require any further processing + return nil + } + desintationURL := fmt.Sprintf("http://localhost:5556/%s", tx.Processor) body := strings.NewReader(payloadString) @@ -122,11 +138,24 @@ func callNextTX(tx *api.BlockChainTX) error { t += scanner.Text() ///fmt.Print(scanner.Text()) } - print(t) - return nil } +//Decode the Payload into JSON and displays the entire Blockchain TX unencoded +func DumpTX(bctx *api.BlockChainTX) { + f := colorjson.NewFormatter() + f.Indent = 4 + var payloadObj map[string]interface{} + payload := bctx.Payload + json.Unmarshal([]byte(payload), &payloadObj) + jsonstring, _ := json.Marshal(bctx) + var obj map[string]interface{} + json.Unmarshal([]byte(jsonstring), &obj) + obj["Payload"] = payloadObj + s, _ := f.Marshal(obj) + fmt.Println(string(s)) +} + func DumpTXID(txid string) { value, raw := QueryChain(txid) println(value) diff --git a/pkg/tendermint/tendermint_test.go b/pkg/tendermint/tendermint_test.go index 43dec6b..ebe5899 100644 --- a/pkg/tendermint/tendermint_test.go +++ b/pkg/tendermint/tendermint_test.go @@ -35,8 +35,7 @@ func Test_DumpTXID(t *testing.T) { //curl -s -X POST "http://localhost:5556/v1/order1" -H "accept: */*" -H "Content-Type: application/json" -d "{\"beneficiaryIDDocumentCID\":\"\",\"extension\":{\"coin\":\"0\"}}" func Test_All(t *testing.T) { - DumpTXID("473407b069ff917b110f38c36d5b9e5246b5ace5d82df38c5a188d5ac868cfec") - DumpTXID("586bc14b15a31999571c8188241beef046d3b78a9481ecee984e7c76a1d95112") + DumpTXID("8a47801e99b29e48a38c74495573e1eb68fa675b294826ce0580dabcadce3b0e") } diff --git a/pkg/tendermint/websockets.go b/pkg/tendermint/websockets.go new file mode 100644 index 0000000..3296c78 --- /dev/null +++ b/pkg/tendermint/websockets.go @@ -0,0 +1,88 @@ +package tendermint + +import ( + "context" + "os" + "os/signal" + "syscall" + + "github.com/apache/incubator-milagro-dta/libs/logger" + tmclient "github.com/tendermint/tendermint/rpc/client" + tmtypes "github.com/tendermint/tendermint/types" +) + +//Subscribe - Connect to the Tendermint websocket to collect events +func Subscribe(logger *logger.Logger) error { + + //tmlogger := log2.NewTMLogger(log.NewSyncWriter(os.Stdout)) + + client := tmclient.NewHTTP("tcp://localhost:26657", "/websocket") + //client.SetLogger(tmlogger) + err := client.Start() + if err != nil { + logger.Info("Failed to start Tendermint HTTP client %s", err) + return err + } + defer client.Stop() + + query := "tm.event = 'Tx'" + out, err := client.Subscribe(context.Background(), "test", query, 1000) + if err != nil { + logger.Info("Failed to subscribe to query %s %s", query, err) + return err + } + + logger.Info("Tendermint: Connected") + + quit := make(chan os.Signal, 1) + signal.Notify(quit, os.Interrupt, syscall.SIGTERM) + + for { + select { + case result := <-out: + + tx := result.Data.(tmtypes.EventDataTx).Tx + + blockchainTX, txid, err := decodeTX(string(tx)) + + logger.Info("Incoming TX %s", txid) + + if err != nil { + logger.Info("Invalid Incoming Transaction %s - %s:", err, string(tx)) + + } + + if blockchainTX.Processor == "NONE" { + DumpTX(blockchainTX) + } else { + callNextTX(blockchainTX) + } + + //print(blockchainTX) + // print(txid) + + // print(string(xx)) + + // a := result.Data.(tmtypes.EventDataTx).Index + // b := result.Data.(tmtypes.EventDataTx) + // c := b.TxResult + // tx := c.Tx + // txdata := []byte(c.Tx) + // print(string(txdata)) + + // print(a) + // Use(c, b, tx) + + //logger.Info("got tx","index", result.Data.(tmtypes.EventDataTx).Index) + case <-quit: + os.Exit(0) + } + } +} + +//Use - helper to remove warnings +func Use(vals ...interface{}) { + for _, val := range vals { + _ = val + } +}
