This is an automated email from the ASF dual-hosted git repository.

cmorris pushed a commit to branch splitroles-blockchain
in repository https://gitbox.apache.org/repos/asf/incubator-milagro-dta.git

commit 27d062afb7fe6a046fa26f707caffb8eac09e29f
Author: Christopher Morris <[email protected]>
AuthorDate: Fri Sep 6 11:08:14 2019 +0100

    Working Order
---
 cmd/service/__debug_bin           | Bin 0 -> 51482340 bytes
 cmd/service/main.go               |  48 +++------------------
 go.mod                            |   1 +
 go.sum                            |   3 ++
 pkg/defaultservice/order.go       |   2 -
 pkg/tendermint/tendermint.go      |  47 ++++++++++++++++----
 pkg/tendermint/tendermint_test.go |   3 +-
 pkg/tendermint/websockets.go      |  88 ++++++++++++++++++++++++++++++++++++++
 8 files changed, 138 insertions(+), 54 deletions(-)

diff --git a/cmd/service/__debug_bin b/cmd/service/__debug_bin
new file mode 100755
index 0000000..c1d3957
Binary files /dev/null and b/cmd/service/__debug_bin differ
diff --git a/cmd/service/main.go b/cmd/service/main.go
index e177b38..877c6c5 100644
--- a/cmd/service/main.go
+++ b/cmd/service/main.go
@@ -21,7 +21,6 @@ Package main - handles config, initialisation and starts the 
service daemon
 package main
 
 import (
-       "context"
        "crypto/rand"
        "fmt"
        "net/http"
@@ -38,16 +37,12 @@ import (
        "github.com/apache/incubator-milagro-dta/pkg/api"
        "github.com/apache/incubator-milagro-dta/pkg/config"
        "github.com/apache/incubator-milagro-dta/pkg/endpoints"
+       "github.com/apache/incubator-milagro-dta/pkg/tendermint"
        "github.com/apache/incubator-milagro-dta/plugins"
        "github.com/go-kit/kit/metrics/prometheus"
        "github.com/pkg/errors"
        stdprometheus "github.com/prometheus/client_golang/prometheus"
        "github.com/prometheus/client_golang/prometheus/promhttp"
-
-       log2 "github.com/tendermint/tendermint/libs/log"
-       tmclient "github.com/tendermint/tendermint/rpc/client"
-       tmtypes "github.com/tendermint/tendermint/types"
-       "github.com/tendermint/tmlibs/log"
 )
 
 func initConfig(args []string) error {
@@ -151,6 +146,11 @@ func startDaemon(args []string) error {
                return errors.Wrap(err, "init logger")
        }
 
+       go tendermint.Subscribe(logger)
+       if err != nil {
+               return errors.Wrap(err, "init Tendermint Blockchain")
+       }
+
        // Create KV store
        logger.Info("Datastore type: %s", cfg.Node.Datastore)
        store, err := initDataStore(cfg.Node.Datastore)
@@ -284,7 +284,7 @@ func initDataStore(ds string) (*datastore.Store, error) {
 
 func main() {
        var err error
-       go listenTendermint()
+
        cmd, args := parseCommand()
        switch cmd {
        default:
@@ -300,37 +300,3 @@ func main() {
                os.Exit(1)
        }
 }
-
-func connectTendermintWS() {
-
-       logger := log2.NewTMLogger(log.NewSyncWriter(os.Stdout))
-
-       client := tmclient.NewHTTP("tcp://localhost:26657", "/websocket")
-       client.SetLogger(logger)
-       err := client.Start()
-       if err != nil {
-               logger.Error("Failed to start a client", "err", err)
-               os.Exit(1)
-       }
-       defer client.Stop()
-
-       query := "tm.event = 'Tx'"
-       out, err := client.Subscribe(context.Background(), "test", query, 1000)
-       if err != nil {
-               logger.Error("Failed to subscribe to query", "err", err, 
"query", query)
-               os.Exit(1)
-       }
-
-       quit := make(chan os.Signal, 1)
-       signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
-
-       for {
-               select {
-               case result := <-out:
-                       logger.Info("got tx",
-                               "index", 
result.Data.(tmtypes.EventDataTx).Index)
-               case <-quit:
-                       os.Exit(0)
-               }
-       }
-}
diff --git a/go.mod b/go.mod
index b0e50bd..44503b6 100644
--- a/go.mod
+++ b/go.mod
@@ -1,6 +1,7 @@
 module github.com/apache/incubator-milagro-dta
 
 require (
+       github.com/TylerBrock/colorjson v0.0.0-20180527164720-95ec53f28296
        github.com/btcsuite/btcd v0.0.0-20190427004231-96897255fd17
        github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d
        github.com/coreos/go-oidc v2.0.0+incompatible
diff --git a/go.sum b/go.sum
index dccddba..94d113f 100644
--- a/go.sum
+++ b/go.sum
@@ -13,6 +13,8 @@ github.com/StackExchange/wmi 
v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrU
 github.com/Stebalien/go-bitfield v0.0.0-20180330043415-076a62f9ce6e/go.mod 
h1:3oM7gXIttpYDAJXpVNnSCiUMYBLIZ6cb1t+Ip982MRo=
 github.com/Stebalien/go-bitfield v0.0.1 
h1:X3kbSSPUaJK60wV2hjOPZwmpljr6VGCqdq4cBLhbQBo=
 github.com/Stebalien/go-bitfield v0.0.1/go.mod 
h1:GNjFpasyUVkHMsfEOk8EFLJ9syQ6SI+XWrX9Wf2XH0s=
+github.com/TylerBrock/colorjson v0.0.0-20180527164720-95ec53f28296 
h1:JYWTroLXcNzSCgu66NMgdjwoMHQRbv2SoOVNFb4kRkE=
+github.com/TylerBrock/colorjson v0.0.0-20180527164720-95ec53f28296/go.mod 
h1:VSw57q4QFiWDbRnjdX8Cb3Ow0SFncRw+bA/ofY6Q83w=
 github.com/VividCortex/gohistogram v1.0.0/go.mod 
h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
 github.com/aead/siphash v1.0.1/go.mod 
h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
 github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod 
h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
@@ -72,6 +74,7 @@ github.com/elgris/jsondiff 
v0.0.0-20160530203242-765b5c24c302/go.mod h1:qBlWZqWe
 github.com/etcd-io/bbolt v1.3.3/go.mod 
h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw=
 github.com/facebookgo/atomicfile v0.0.0-20151019160806-2de1f203e7d5/go.mod 
h1:JpoxHjuQauoxiFMl1ie8Xc/7TfLuMZ5eOCONd1sUBHg=
 github.com/fatih/color v1.6.0/go.mod 
h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
+github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys=
 github.com/fatih/color v1.7.0/go.mod 
h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
 github.com/fd/go-nat v1.0.0/go.mod 
h1:BTBu/CKvMmOMUPkKVef1pngt2WFH/lg7E6yQnulfp6E=
 github.com/fortytw2/leaktest v1.2.0/go.mod 
h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
diff --git a/pkg/defaultservice/order.go b/pkg/defaultservice/order.go
index 46dfb16..8bb1488 100644
--- a/pkg/defaultservice/order.go
+++ b/pkg/defaultservice/order.go
@@ -433,7 +433,5 @@ func (s *Service) Order2(req *api.FulfillOrderResponse) 
(string, error) {
                Payload:     marshaledRequest,
        }
        //curl --data-binary 
'{"jsonrpc":"2.0","id":"anything","method":"broadcast_tx_commit","params": 
{"tx": "YWFhcT1hYWFxCg=="}}' -H 'content-type:text/plain;' 
http://localhost:26657
-
        return tendermint.PostToChain(chainTX, "Order2")
-
 }
diff --git a/pkg/tendermint/tendermint.go b/pkg/tendermint/tendermint.go
index 286f1da..5427ba1 100644
--- a/pkg/tendermint/tendermint.go
+++ b/pkg/tendermint/tendermint.go
@@ -6,11 +6,13 @@ import (
        "encoding/base64"
        "encoding/hex"
        "encoding/json"
+       "errors"
        "fmt"
        "net/http"
        "os"
        "strings"
 
+       "github.com/TylerBrock/colorjson"
        "github.com/apache/incubator-milagro-dta/pkg/api"
 )
 
@@ -44,7 +46,7 @@ func PostToChain(payload *api.BlockChainTX, method string) 
(string, error) {
        TXIDhex := hex.EncodeToString(TXID[:])
        fullTx := fmt.Sprintf("%s=%s", TXIDhex, string(serializedTX))
 
-       fmt.Printf(" **** %s Block TX: %s\n", method, TXIDhex)
+       //fmt.Printf(" **** %s Block TX: %s\n", method, TXIDhex)
        base64EncodedTX := base64.StdEncoding.EncodeToString([]byte(fullTx))
        body := 
strings.NewReader("{\"jsonrpc\":\"2.0\",\"id\":\"anything\",\"method\":\"broadcast_tx_commit\",\"params\":
 {\"tx\": \"" + base64EncodedTX + "\"}}")
        req, err := http.NewRequest("POST", "http://localhost:26657";, body)
@@ -69,12 +71,6 @@ func HandleChainTX(myID string, tx string) error {
        if err != nil {
                return err
        }
-
-       if blockChainTX.Processor == "NONE" {
-               print("Process Complete")
-               print(string(blockChainTX.Payload))
-               return nil
-       }
        err = callNextTX(blockChainTX)
        if err != nil {
                return err
@@ -94,12 +90,32 @@ func decodeChainTX(payload string) (*api.BlockChainTX, 
error) {
        return tx, nil
 }
 
+//DecodeChainTX - Decode the On Chain TX into a BlockChainTX object
+func decodeTX(payload string) (*api.BlockChainTX, string, error) {
+       tx := &api.BlockChainTX{}
+       parts := strings.SplitN(payload, "=", 2)
+       if len(parts) != 2 {
+               return &api.BlockChainTX{}, "", errors.New("Invalid TX payload")
+       }
+       hash := string(parts[0])
+       err := json.Unmarshal([]byte(parts[1]), tx)
+       if err != nil {
+               return &api.BlockChainTX{}, "", err
+       }
+       return tx, hash, nil
+}
+
 func callNextTX(tx *api.BlockChainTX) error {
        // recipient := tx.RecipientID
        // sender := tx.SenderID
        //payloadJSON := tx.Payload
        payloadString := string(tx.Payload)
 
+       if tx.Processor == "NONE" {
+               //The TX is information and doesn't require any further 
processing
+               return nil
+       }
+
        desintationURL := fmt.Sprintf("http://localhost:5556/%s";, tx.Processor)
 
        body := strings.NewReader(payloadString)
@@ -122,11 +138,24 @@ func callNextTX(tx *api.BlockChainTX) error {
                t += scanner.Text()
                ///fmt.Print(scanner.Text())
        }
-       print(t)
-
        return nil
 }
 
+//Decode the Payload into JSON and displays the entire Blockchain TX unencoded
+func DumpTX(bctx *api.BlockChainTX) {
+       f := colorjson.NewFormatter()
+       f.Indent = 4
+       var payloadObj map[string]interface{}
+       payload := bctx.Payload
+       json.Unmarshal([]byte(payload), &payloadObj)
+       jsonstring, _ := json.Marshal(bctx)
+       var obj map[string]interface{}
+       json.Unmarshal([]byte(jsonstring), &obj)
+       obj["Payload"] = payloadObj
+       s, _ := f.Marshal(obj)
+       fmt.Println(string(s))
+}
+
 func DumpTXID(txid string) {
        value, raw := QueryChain(txid)
        println(value)
diff --git a/pkg/tendermint/tendermint_test.go 
b/pkg/tendermint/tendermint_test.go
index 43dec6b..ebe5899 100644
--- a/pkg/tendermint/tendermint_test.go
+++ b/pkg/tendermint/tendermint_test.go
@@ -35,8 +35,7 @@ func Test_DumpTXID(t *testing.T) {
 //curl -s -X POST "http://localhost:5556/v1/order1"; -H "accept: */*" -H 
"Content-Type: application/json" -d 
"{\"beneficiaryIDDocumentCID\":\"\",\"extension\":{\"coin\":\"0\"}}"
 
 func Test_All(t *testing.T) {
-       
DumpTXID("473407b069ff917b110f38c36d5b9e5246b5ace5d82df38c5a188d5ac868cfec")
-       
DumpTXID("586bc14b15a31999571c8188241beef046d3b78a9481ecee984e7c76a1d95112")
+       
DumpTXID("8a47801e99b29e48a38c74495573e1eb68fa675b294826ce0580dabcadce3b0e")
 
 }
 
diff --git a/pkg/tendermint/websockets.go b/pkg/tendermint/websockets.go
new file mode 100644
index 0000000..3296c78
--- /dev/null
+++ b/pkg/tendermint/websockets.go
@@ -0,0 +1,88 @@
+package tendermint
+
+import (
+       "context"
+       "os"
+       "os/signal"
+       "syscall"
+
+       "github.com/apache/incubator-milagro-dta/libs/logger"
+       tmclient "github.com/tendermint/tendermint/rpc/client"
+       tmtypes "github.com/tendermint/tendermint/types"
+)
+
+//Subscribe - Connect to the Tendermint websocket to collect events
+func Subscribe(logger *logger.Logger) error {
+
+       //tmlogger := log2.NewTMLogger(log.NewSyncWriter(os.Stdout))
+
+       client := tmclient.NewHTTP("tcp://localhost:26657", "/websocket")
+       //client.SetLogger(tmlogger)
+       err := client.Start()
+       if err != nil {
+               logger.Info("Failed to start Tendermint HTTP client %s", err)
+               return err
+       }
+       defer client.Stop()
+
+       query := "tm.event = 'Tx'"
+       out, err := client.Subscribe(context.Background(), "test", query, 1000)
+       if err != nil {
+               logger.Info("Failed to subscribe to query %s %s", query, err)
+               return err
+       }
+
+       logger.Info("Tendermint: Connected")
+
+       quit := make(chan os.Signal, 1)
+       signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
+
+       for {
+               select {
+               case result := <-out:
+
+                       tx := result.Data.(tmtypes.EventDataTx).Tx
+
+                       blockchainTX, txid, err := decodeTX(string(tx))
+
+                       logger.Info("Incoming TX %s", txid)
+
+                       if err != nil {
+                               logger.Info("Invalid Incoming Transaction %s - 
%s:", err, string(tx))
+
+                       }
+
+                       if blockchainTX.Processor == "NONE" {
+                               DumpTX(blockchainTX)
+                       } else {
+                               callNextTX(blockchainTX)
+                       }
+
+                       //print(blockchainTX)
+                       // print(txid)
+
+                       // print(string(xx))
+
+                       // a := result.Data.(tmtypes.EventDataTx).Index
+                       // b := result.Data.(tmtypes.EventDataTx)
+                       // c := b.TxResult
+                       // tx := c.Tx
+                       // txdata := []byte(c.Tx)
+                       // print(string(txdata))
+
+                       // print(a)
+                       // Use(c, b, tx)
+
+                       //logger.Info("got tx","index", 
result.Data.(tmtypes.EventDataTx).Index)
+               case <-quit:
+                       os.Exit(0)
+               }
+       }
+}
+
+//Use - helper to remove warnings
+func Use(vals ...interface{}) {
+       for _, val := range vals {
+               _ = val
+       }
+}

Reply via email to