This is an automated email from the ASF dual-hosted git repository. jiashuo pushed a commit to branch add-table-migrator in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit 0ea696591c1ab6cae8af3954f9b8c58229fb01ff Author: jiashuo <[email protected]> AuthorDate: Mon Jun 13 15:19:22 2022 +0800 init --- admin-cli/executor/data_version.go | 66 +++++++++++++ admin-cli/executor/server_config.go | 109 ++++++--------------- .../executor/toolkits/tablemigrator/migrator.go | 11 +++ admin-cli/util/http_client.go | 58 +++++++++++ 4 files changed, 163 insertions(+), 81 deletions(-) diff --git a/admin-cli/executor/data_version.go b/admin-cli/executor/data_version.go new file mode 100644 index 00000000..97c9d8c9 --- /dev/null +++ b/admin-cli/executor/data_version.go @@ -0,0 +1,66 @@ +package executor + +import ( + "encoding/json" + "fmt" + "github.com/apache/incubator-pegasus/admin-cli/util" + "github.com/apache/incubator-pegasus/go-client/session" +) + +type TableDataVersion struct { + DataVersion string `json:"data_version"` +} + +func QueryTableVersion(client *Client, table string) error { + version, err := QueryReplicaDataVersion(client, table) + if err != nil { + return nil + } + + // formats into JSON + outputBytes, _ := json.MarshalIndent(version, "", " ") + fmt.Fprintln(client, string(outputBytes)) + return nil +} + +func QueryReplicaDataVersion(client *Client, table string) (*TableDataVersion, error) { + nodes := client.Nodes.GetAllNodes(session.NodeTypeReplica) + resp, err := client.Meta.QueryConfig(table) + if err != nil { + return nil, err + } + + args := util.Arguments{ + Name: "app_id", + Value: string(resp.AppID), + } + results := util.BatchCallHTTP(nodes, getTableDataVersion, args) + + var finalVersion string + var version TableDataVersion + for _, result := range results { + if result.Err != nil { + return nil, result.Err + } + err := json.Unmarshal([]byte(result.Resp), &version) + if err != nil { + return nil, err + } + + if finalVersion == "" { + finalVersion = version.DataVersion + } else { + if version.DataVersion == finalVersion { + continue + } else { + return nil, fmt.Errorf("replica versions are not consistent!") + } + } + } + return &version, nil +} + +func getTableDataVersion(addr string, args util.Arguments) (string, error) { + url := fmt.Sprintf("http://%s/replica/data_version?%s=%s", args.Name, args.Value) + return util.CallHTTPGet(url) +} diff --git a/admin-cli/executor/server_config.go b/admin-cli/executor/server_config.go index f17cf27c..e857d702 100644 --- a/admin-cli/executor/server_config.go +++ b/admin-cli/executor/server_config.go @@ -20,26 +20,20 @@ package executor import ( - "context" "encoding/json" "fmt" "sort" "strings" - "sync" - "time" "github.com/apache/incubator-pegasus/admin-cli/util" "github.com/apache/incubator-pegasus/go-client/session" - "github.com/go-resty/resty/v2" ) -type httpRequest func(addr string, cmd command) (string, error) - -// map[*util.PegasusNode]*cmdResult is not sorted, pass nodes is for print sorted result -type printResponse func(nodeType session.NodeType, sortedNodeList []string, resp map[string]*cmdResult) +// map[*util.PegasusNode]*util.Result is not sorted, pass nodes is for print sorted result +type printResponse func(nodeType session.NodeType, sortedNodeList []string, resp map[string]*util.Result) type action struct { - request httpRequest + request util.HttpRequest print printResponse } @@ -55,11 +49,6 @@ var sectionsMap = map[session.NodeType]string{ // TODO(jiashuo1) support collector } -type command struct { - name string - value string -} - type response struct { Name string Section string @@ -67,11 +56,6 @@ type response struct { Value string } -type cmdResult struct { - resp string - err error -} - //TODO(jiashuo1) not support update collector config func ConfigCommand(client *Client, nodeType session.NodeType, nodeAddr string, name string, actionType string, value string) error { var nodes []*util.PegasusNode @@ -87,11 +71,11 @@ func ConfigCommand(client *Client, nodeType session.NodeType, nodeAddr string, n } if ac, ok := actionsMap[actionType]; ok { - cmd := command{ - name: name, - value: value, + cmd := util.Arguments{ + Name: name, + Value: value, } - results := batchCallHTTP(nodes, ac.request, cmd) + results := util.BatchCallHTTP(nodes, ac.request, cmd) var sortedNodeList []string for _, n := range nodes { @@ -106,59 +90,22 @@ func ConfigCommand(client *Client, nodeType session.NodeType, nodeAddr string, n return nil } -func batchCallHTTP(nodes []*util.PegasusNode, request httpRequest, cmd command) map[string]*cmdResult { - results := make(map[string]*cmdResult) - - var mu sync.Mutex - var wg sync.WaitGroup - wg.Add(len(nodes)) - for _, n := range nodes { - go func(node *util.PegasusNode) { - _, cancel := context.WithTimeout(context.Background(), time.Second*10) - defer cancel() - result, err := request(node.TCPAddr(), cmd) - mu.Lock() - if err != nil { - results[node.CombinedAddr()] = &cmdResult{err: err} - } else { - results[node.CombinedAddr()] = &cmdResult{resp: result} - } - mu.Unlock() - wg.Done() - }(n) - } - wg.Wait() - - return results -} - -func callHTTP(url string) (string, error) { - resp, err := resty.New().SetTimeout(time.Second * 10).R().Get(url) - if err != nil { - return "", fmt.Errorf("failed to call \"%s\": %s", url, err) - } - if resp.StatusCode() != 200 { - return "", fmt.Errorf("failed to call \"%s\": code=%d", url, resp.StatusCode()) - } - return string(resp.Body()), nil -} - -func listConfig(addr string, cmd command) (string, error) { +func listConfig(addr string, cmd util.Arguments) (string, error) { url := fmt.Sprintf("http://%s/configs", addr) - return callHTTP(url) + return util.CallHTTPGet(url) } -func printConfigList(nodeType session.NodeType, sortedNodeList []string, results map[string]*cmdResult) { +func printConfigList(nodeType session.NodeType, sortedNodeList []string, results map[string]*util.Result) { fmt.Printf("CMD: list \n") for _, node := range sortedNodeList { cmdRes := results[node] - if cmdRes.err != nil { - fmt.Printf("[%s] %s\n", node, cmdRes.err) + if cmdRes.Err != nil { + fmt.Printf("[%s] %s\n", node, cmdRes.Err) continue } var respMap map[string]response - err := json.Unmarshal([]byte(cmdRes.resp), &respMap) + err := json.Unmarshal([]byte(cmdRes.Resp), &respMap) if err != nil { fmt.Printf("[%s] %s\n", node, err) continue @@ -178,24 +125,24 @@ func printConfigList(nodeType session.NodeType, sortedNodeList []string, results } } -func getConfig(addr string, cmd command) (string, error) { - url := fmt.Sprintf("http://%s/config?name=%s", addr, cmd.name) - return callHTTP(url) +func getConfig(addr string, cmd util.Arguments) (string, error) { + url := fmt.Sprintf("http://%s/config?name=%s", addr, cmd.Name) + return util.CallHTTPGet(url) } -func printConfigValue(nodeType session.NodeType, sortedNodeList []string, results map[string]*cmdResult) { +func printConfigValue(nodeType session.NodeType, sortedNodeList []string, results map[string]*util.Result) { fmt.Printf("CMD: get \n") for _, node := range sortedNodeList { cmdRes := results[node] - if cmdRes.err != nil { - fmt.Printf("[%s] %s\n", node, cmdRes.err) + if cmdRes.Err != nil { + fmt.Printf("[%s] %s\n", node, cmdRes.Err) continue } var resp response - err := json.Unmarshal([]byte(cmdRes.resp), &resp) + err := json.Unmarshal([]byte(cmdRes.Resp), &resp) if err != nil { - fmt.Printf("[%s] %s\n", node, cmdRes.resp) + fmt.Printf("[%s] %s\n", node, cmdRes.Resp) continue } @@ -208,22 +155,22 @@ func printConfigValue(nodeType session.NodeType, sortedNodeList []string, result } } -func updateConfig(addr string, cmd command) (string, error) { - url := fmt.Sprintf("http://%s/updateConfig?%s=%s", addr, cmd.name, cmd.value) - return callHTTP(url) +func updateConfig(addr string, cmd util.Arguments) (string, error) { + url := fmt.Sprintf("http://%s/updateConfig?%s=%s", addr, cmd.Name, cmd.Value) + return util.CallHTTPGet(url) } -func printConfigUpdate(nodeType session.NodeType, sortedNodeList []string, results map[string]*cmdResult) { +func printConfigUpdate(nodeType session.NodeType, sortedNodeList []string, results map[string]*util.Result) { fmt.Printf("CMD: set \n") for _, node := range sortedNodeList { cmdRes := results[node] - if cmdRes.err != nil { - fmt.Printf("[%s] %s\n", node, cmdRes.err) + if cmdRes.Err != nil { + fmt.Printf("[%s] %s\n", node, cmdRes.Err) continue } var resMap map[string]string - err := json.Unmarshal([]byte(cmdRes.resp), &resMap) + err := json.Unmarshal([]byte(cmdRes.Resp), &resMap) if err != nil { fmt.Printf("[%s] %s\n", node, err) continue diff --git a/admin-cli/executor/toolkits/tablemigrator/migrator.go b/admin-cli/executor/toolkits/tablemigrator/migrator.go new file mode 100644 index 00000000..e18c3744 --- /dev/null +++ b/admin-cli/executor/toolkits/tablemigrator/migrator.go @@ -0,0 +1,11 @@ +package tablemigrator + +/** +1. check data version +2. create table duplication +3. check confirm decree if < 5k +4. set env config deny write request +5. check confirm decree if == 0 +6. switch table env addrs +7. set env config deny and re-config +*/ diff --git a/admin-cli/util/http_client.go b/admin-cli/util/http_client.go new file mode 100644 index 00000000..473c6fb7 --- /dev/null +++ b/admin-cli/util/http_client.go @@ -0,0 +1,58 @@ +package util + +import ( + "context" + "fmt" + "github.com/go-resty/resty/v2" + "sync" + "time" +) + +type Arguments struct { + Name string + Value string +} + +type Result struct { + Resp string + Err error +} + +type HttpRequest func(addr string, args Arguments) (string, error) + +func BatchCallHTTP(nodes []*PegasusNode, request HttpRequest, args Arguments) map[string]*Result { + results := make(map[string]*Result) + + var mu sync.Mutex + var wg sync.WaitGroup + wg.Add(len(nodes)) + for _, n := range nodes { + go func(node *PegasusNode) { + _, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + result, err := request(node.TCPAddr(), args) + mu.Lock() + if err != nil { + results[node.CombinedAddr()] = &Result{Err: err} + } else { + results[node.CombinedAddr()] = &Result{Resp: result} + } + mu.Unlock() + wg.Done() + }(n) + } + wg.Wait() + + return results +} + +func CallHTTPGet(url string) (string, error) { + resp, err := resty.New().SetTimeout(time.Second * 10).R().Get(url) + if err != nil { + return "", fmt.Errorf("failed to call \"%s\": %s", url, err) + } + if resp.StatusCode() != 200 { + return "", fmt.Errorf("failed to call \"%s\": code=%d", url, resp.StatusCode()) + } + return string(resp.Body()), nil +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
