This is an automated email from the ASF dual-hosted git repository.

jiashuo pushed a commit to branch add-table-migrator
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git

commit 0ea696591c1ab6cae8af3954f9b8c58229fb01ff
Author: jiashuo <[email protected]>
AuthorDate: Mon Jun 13 15:19:22 2022 +0800

    init
---
 admin-cli/executor/data_version.go                 |  66 +++++++++++++
 admin-cli/executor/server_config.go                | 109 ++++++---------------
 .../executor/toolkits/tablemigrator/migrator.go    |  11 +++
 admin-cli/util/http_client.go                      |  58 +++++++++++
 4 files changed, 163 insertions(+), 81 deletions(-)

diff --git a/admin-cli/executor/data_version.go 
b/admin-cli/executor/data_version.go
new file mode 100644
index 00000000..97c9d8c9
--- /dev/null
+++ b/admin-cli/executor/data_version.go
@@ -0,0 +1,66 @@
+package executor
+
+import (
+       "encoding/json"
+       "fmt"
+       "github.com/apache/incubator-pegasus/admin-cli/util"
+       "github.com/apache/incubator-pegasus/go-client/session"
+)
+
+type TableDataVersion struct {
+       DataVersion string `json:"data_version"`
+}
+
+func QueryTableVersion(client *Client, table string) error {
+       version, err := QueryReplicaDataVersion(client, table)
+       if err != nil {
+               return nil
+       }
+
+       // formats into JSON
+       outputBytes, _ := json.MarshalIndent(version, "", "  ")
+       fmt.Fprintln(client, string(outputBytes))
+       return nil
+}
+
+func QueryReplicaDataVersion(client *Client, table string) (*TableDataVersion, 
error) {
+       nodes := client.Nodes.GetAllNodes(session.NodeTypeReplica)
+       resp, err := client.Meta.QueryConfig(table)
+       if err != nil {
+               return nil, err
+       }
+
+       args := util.Arguments{
+               Name:  "app_id",
+               Value: string(resp.AppID),
+       }
+       results := util.BatchCallHTTP(nodes, getTableDataVersion, args)
+
+       var finalVersion string
+       var version TableDataVersion
+       for _, result := range results {
+               if result.Err != nil {
+                       return nil, result.Err
+               }
+               err := json.Unmarshal([]byte(result.Resp), &version)
+               if err != nil {
+                       return nil, err
+               }
+
+               if finalVersion == "" {
+                       finalVersion = version.DataVersion
+               } else {
+                       if version.DataVersion == finalVersion {
+                               continue
+                       } else {
+                               return nil, fmt.Errorf("replica versions are 
not consistent!")
+                       }
+               }
+       }
+       return &version, nil
+}
+
+func getTableDataVersion(addr string, args util.Arguments) (string, error) {
+       url := fmt.Sprintf("http://%s/replica/data_version?%s=%s";, args.Name, 
args.Value)
+       return util.CallHTTPGet(url)
+}
diff --git a/admin-cli/executor/server_config.go 
b/admin-cli/executor/server_config.go
index f17cf27c..e857d702 100644
--- a/admin-cli/executor/server_config.go
+++ b/admin-cli/executor/server_config.go
@@ -20,26 +20,20 @@
 package executor
 
 import (
-       "context"
        "encoding/json"
        "fmt"
        "sort"
        "strings"
-       "sync"
-       "time"
 
        "github.com/apache/incubator-pegasus/admin-cli/util"
        "github.com/apache/incubator-pegasus/go-client/session"
-       "github.com/go-resty/resty/v2"
 )
 
-type httpRequest func(addr string, cmd command) (string, error)
-
-// map[*util.PegasusNode]*cmdResult is not sorted, pass nodes is for print 
sorted result
-type printResponse func(nodeType session.NodeType, sortedNodeList []string, 
resp map[string]*cmdResult)
+// map[*util.PegasusNode]*util.Result is not sorted, pass nodes is for print 
sorted result
+type printResponse func(nodeType session.NodeType, sortedNodeList []string, 
resp map[string]*util.Result)
 
 type action struct {
-       request httpRequest
+       request util.HttpRequest
        print   printResponse
 }
 
@@ -55,11 +49,6 @@ var sectionsMap = map[session.NodeType]string{
        // TODO(jiashuo1) support collector
 }
 
-type command struct {
-       name  string
-       value string
-}
-
 type response struct {
        Name    string
        Section string
@@ -67,11 +56,6 @@ type response struct {
        Value   string
 }
 
-type cmdResult struct {
-       resp string
-       err  error
-}
-
 //TODO(jiashuo1) not support update collector config
 func ConfigCommand(client *Client, nodeType session.NodeType, nodeAddr string, 
name string, actionType string, value string) error {
        var nodes []*util.PegasusNode
@@ -87,11 +71,11 @@ func ConfigCommand(client *Client, nodeType 
session.NodeType, nodeAddr string, n
        }
 
        if ac, ok := actionsMap[actionType]; ok {
-               cmd := command{
-                       name:  name,
-                       value: value,
+               cmd := util.Arguments{
+                       Name:  name,
+                       Value: value,
                }
-               results := batchCallHTTP(nodes, ac.request, cmd)
+               results := util.BatchCallHTTP(nodes, ac.request, cmd)
 
                var sortedNodeList []string
                for _, n := range nodes {
@@ -106,59 +90,22 @@ func ConfigCommand(client *Client, nodeType 
session.NodeType, nodeAddr string, n
        return nil
 }
 
-func batchCallHTTP(nodes []*util.PegasusNode, request httpRequest, cmd 
command) map[string]*cmdResult {
-       results := make(map[string]*cmdResult)
-
-       var mu sync.Mutex
-       var wg sync.WaitGroup
-       wg.Add(len(nodes))
-       for _, n := range nodes {
-               go func(node *util.PegasusNode) {
-                       _, cancel := context.WithTimeout(context.Background(), 
time.Second*10)
-                       defer cancel()
-                       result, err := request(node.TCPAddr(), cmd)
-                       mu.Lock()
-                       if err != nil {
-                               results[node.CombinedAddr()] = &cmdResult{err: 
err}
-                       } else {
-                               results[node.CombinedAddr()] = &cmdResult{resp: 
result}
-                       }
-                       mu.Unlock()
-                       wg.Done()
-               }(n)
-       }
-       wg.Wait()
-
-       return results
-}
-
-func callHTTP(url string) (string, error) {
-       resp, err := resty.New().SetTimeout(time.Second * 10).R().Get(url)
-       if err != nil {
-               return "", fmt.Errorf("failed to call \"%s\": %s", url, err)
-       }
-       if resp.StatusCode() != 200 {
-               return "", fmt.Errorf("failed to call \"%s\": code=%d", url, 
resp.StatusCode())
-       }
-       return string(resp.Body()), nil
-}
-
-func listConfig(addr string, cmd command) (string, error) {
+func listConfig(addr string, cmd util.Arguments) (string, error) {
        url := fmt.Sprintf("http://%s/configs";, addr)
-       return callHTTP(url)
+       return util.CallHTTPGet(url)
 }
 
-func printConfigList(nodeType session.NodeType, sortedNodeList []string, 
results map[string]*cmdResult) {
+func printConfigList(nodeType session.NodeType, sortedNodeList []string, 
results map[string]*util.Result) {
        fmt.Printf("CMD: list \n")
        for _, node := range sortedNodeList {
                cmdRes := results[node]
-               if cmdRes.err != nil {
-                       fmt.Printf("[%s] %s\n", node, cmdRes.err)
+               if cmdRes.Err != nil {
+                       fmt.Printf("[%s] %s\n", node, cmdRes.Err)
                        continue
                }
 
                var respMap map[string]response
-               err := json.Unmarshal([]byte(cmdRes.resp), &respMap)
+               err := json.Unmarshal([]byte(cmdRes.Resp), &respMap)
                if err != nil {
                        fmt.Printf("[%s] %s\n", node, err)
                        continue
@@ -178,24 +125,24 @@ func printConfigList(nodeType session.NodeType, 
sortedNodeList []string, results
        }
 }
 
-func getConfig(addr string, cmd command) (string, error) {
-       url := fmt.Sprintf("http://%s/config?name=%s";, addr, cmd.name)
-       return callHTTP(url)
+func getConfig(addr string, cmd util.Arguments) (string, error) {
+       url := fmt.Sprintf("http://%s/config?name=%s";, addr, cmd.Name)
+       return util.CallHTTPGet(url)
 }
 
-func printConfigValue(nodeType session.NodeType, sortedNodeList []string, 
results map[string]*cmdResult) {
+func printConfigValue(nodeType session.NodeType, sortedNodeList []string, 
results map[string]*util.Result) {
        fmt.Printf("CMD: get \n")
        for _, node := range sortedNodeList {
                cmdRes := results[node]
-               if cmdRes.err != nil {
-                       fmt.Printf("[%s] %s\n", node, cmdRes.err)
+               if cmdRes.Err != nil {
+                       fmt.Printf("[%s] %s\n", node, cmdRes.Err)
                        continue
                }
 
                var resp response
-               err := json.Unmarshal([]byte(cmdRes.resp), &resp)
+               err := json.Unmarshal([]byte(cmdRes.Resp), &resp)
                if err != nil {
-                       fmt.Printf("[%s] %s\n", node, cmdRes.resp)
+                       fmt.Printf("[%s] %s\n", node, cmdRes.Resp)
                        continue
                }
 
@@ -208,22 +155,22 @@ func printConfigValue(nodeType session.NodeType, 
sortedNodeList []string, result
        }
 }
 
-func updateConfig(addr string, cmd command) (string, error) {
-       url := fmt.Sprintf("http://%s/updateConfig?%s=%s";, addr, cmd.name, 
cmd.value)
-       return callHTTP(url)
+func updateConfig(addr string, cmd util.Arguments) (string, error) {
+       url := fmt.Sprintf("http://%s/updateConfig?%s=%s";, addr, cmd.Name, 
cmd.Value)
+       return util.CallHTTPGet(url)
 }
 
-func printConfigUpdate(nodeType session.NodeType, sortedNodeList []string, 
results map[string]*cmdResult) {
+func printConfigUpdate(nodeType session.NodeType, sortedNodeList []string, 
results map[string]*util.Result) {
        fmt.Printf("CMD: set \n")
        for _, node := range sortedNodeList {
                cmdRes := results[node]
-               if cmdRes.err != nil {
-                       fmt.Printf("[%s] %s\n", node, cmdRes.err)
+               if cmdRes.Err != nil {
+                       fmt.Printf("[%s] %s\n", node, cmdRes.Err)
                        continue
                }
 
                var resMap map[string]string
-               err := json.Unmarshal([]byte(cmdRes.resp), &resMap)
+               err := json.Unmarshal([]byte(cmdRes.Resp), &resMap)
                if err != nil {
                        fmt.Printf("[%s] %s\n", node, err)
                        continue
diff --git a/admin-cli/executor/toolkits/tablemigrator/migrator.go 
b/admin-cli/executor/toolkits/tablemigrator/migrator.go
new file mode 100644
index 00000000..e18c3744
--- /dev/null
+++ b/admin-cli/executor/toolkits/tablemigrator/migrator.go
@@ -0,0 +1,11 @@
+package tablemigrator
+
+/**
+1. check data version
+2. create table duplication
+3. check confirm decree if < 5k
+4. set env config deny write request
+5. check confirm decree if == 0
+6. switch table env addrs
+7. set env config deny and re-config
+*/
diff --git a/admin-cli/util/http_client.go b/admin-cli/util/http_client.go
new file mode 100644
index 00000000..473c6fb7
--- /dev/null
+++ b/admin-cli/util/http_client.go
@@ -0,0 +1,58 @@
+package util
+
+import (
+       "context"
+       "fmt"
+       "github.com/go-resty/resty/v2"
+       "sync"
+       "time"
+)
+
+type Arguments struct {
+       Name  string
+       Value string
+}
+
+type Result struct {
+       Resp string
+       Err  error
+}
+
+type HttpRequest func(addr string, args Arguments) (string, error)
+
+func BatchCallHTTP(nodes []*PegasusNode, request HttpRequest, args Arguments) 
map[string]*Result {
+       results := make(map[string]*Result)
+
+       var mu sync.Mutex
+       var wg sync.WaitGroup
+       wg.Add(len(nodes))
+       for _, n := range nodes {
+               go func(node *PegasusNode) {
+                       _, cancel := context.WithTimeout(context.Background(), 
time.Second*10)
+                       defer cancel()
+                       result, err := request(node.TCPAddr(), args)
+                       mu.Lock()
+                       if err != nil {
+                               results[node.CombinedAddr()] = &Result{Err: err}
+                       } else {
+                               results[node.CombinedAddr()] = &Result{Resp: 
result}
+                       }
+                       mu.Unlock()
+                       wg.Done()
+               }(n)
+       }
+       wg.Wait()
+
+       return results
+}
+
+func CallHTTPGet(url string) (string, error) {
+       resp, err := resty.New().SetTimeout(time.Second * 10).R().Get(url)
+       if err != nil {
+               return "", fmt.Errorf("failed to call \"%s\": %s", url, err)
+       }
+       if resp.StatusCode() != 200 {
+               return "", fmt.Errorf("failed to call \"%s\": code=%d", url, 
resp.StatusCode())
+       }
+       return string(resp.Body()), nil
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to