This is an automated email from the ASF dual-hosted git repository. jiashuo pushed a commit to branch add-table-migrator in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit f76837d713eb11e40512b380e0fa45b3e3cd734f Author: jiashuo <[email protected]> AuthorDate: Tue Jun 14 15:10:57 2022 +0800 update --- admin-cli/cmd/table_env.go | 1 + admin-cli/cmd/table_migrator.go | 28 +++++++++ admin-cli/executor/data_version.go | 5 +- admin-cli/executor/server_config.go | 2 +- .../executor/toolkits/tablemigrator/migrator.go | 29 +++++---- .../meta_proxy.go => tablemigrator/switcher.go} | 68 +++------------------- admin-cli/util/http_client.go | 7 ++- 7 files changed, 57 insertions(+), 83 deletions(-) diff --git a/admin-cli/cmd/table_env.go b/admin-cli/cmd/table_env.go index ba9bcf94..cd10eef1 100644 --- a/admin-cli/cmd/table_env.go +++ b/admin-cli/cmd/table_env.go @@ -31,6 +31,7 @@ import ( var predefinedAppEnvKeys = []string{ "rocksdb.usage_scenario", "replica.deny_client_write", + "replica.deny_client_request", "replica.write_throttling", "replica.write_throttling_by_size", "default_ttl", diff --git a/admin-cli/cmd/table_migrator.go b/admin-cli/cmd/table_migrator.go new file mode 100644 index 00000000..0b4d13cf --- /dev/null +++ b/admin-cli/cmd/table_migrator.go @@ -0,0 +1,28 @@ +package cmd + +import ( + "github.com/apache/incubator-pegasus/admin-cli/executor/toolkits/tablemigrator" + "github.com/apache/incubator-pegasus/admin-cli/shell" + "github.com/desertbit/grumble" +) + +func init() { + shell.AddCommand(&grumble.Command{ + Name: "table-migrator", + Help: "migrate table from current cluster to another via table duplication and metaproxy", + Run: func(c *grumble.Context) error { + return tablemigrator.MigrateTable(pegasusClient, c.Flags.String("table"), + c.Flags.String("node"), c.Flags.String("root"), + c.Flags.String("cluster"), c.Flags.String("meta")) + }, + Flags: func(f *grumble.Flags) { + f.String("t", "table", "", "table name") + f.String("n", "node", "", "zk node: addrs:port, default equal with peagsus "+ + "cluster zk addrs, you can use `cluster-info` to show it") + f.String("r", "root", "", "zk root path. the tool will update table addrs in "+ + "the path of meatproxy, if you don't specify it, that is means user need manual-switch the table addrs") + f.String("c", "cluster", "", "target cluster name") + f.String("m", "meta", "", "target meta list") + }, + }) +} diff --git a/admin-cli/executor/data_version.go b/admin-cli/executor/data_version.go index 97c9d8c9..22138093 100644 --- a/admin-cli/executor/data_version.go +++ b/admin-cli/executor/data_version.go @@ -3,6 +3,7 @@ package executor import ( "encoding/json" "fmt" + "github.com/apache/incubator-pegasus/admin-cli/util" "github.com/apache/incubator-pegasus/go-client/session" ) @@ -53,7 +54,7 @@ func QueryReplicaDataVersion(client *Client, table string) (*TableDataVersion, e if version.DataVersion == finalVersion { continue } else { - return nil, fmt.Errorf("replica versions are not consistent!") + return nil, fmt.Errorf("replica versions are not consistent") } } } @@ -61,6 +62,6 @@ func QueryReplicaDataVersion(client *Client, table string) (*TableDataVersion, e } func getTableDataVersion(addr string, args util.Arguments) (string, error) { - url := fmt.Sprintf("http://%s/replica/data_version?%s=%s", args.Name, args.Value) + url := fmt.Sprintf("http://%s/replica/data_version?%s=%s", addr, args.Name, args.Value) return util.CallHTTPGet(url) } diff --git a/admin-cli/executor/server_config.go b/admin-cli/executor/server_config.go index e857d702..062d53d9 100644 --- a/admin-cli/executor/server_config.go +++ b/admin-cli/executor/server_config.go @@ -33,7 +33,7 @@ import ( type printResponse func(nodeType session.NodeType, sortedNodeList []string, resp map[string]*util.Result) type action struct { - request util.HttpRequest + request util.HTTPRequestFunc print printResponse } diff --git a/admin-cli/executor/toolkits/tablemigrator/migrator.go b/admin-cli/executor/toolkits/tablemigrator/migrator.go index e6b0f7a3..8a3d2a42 100644 --- a/admin-cli/executor/toolkits/tablemigrator/migrator.go +++ b/admin-cli/executor/toolkits/tablemigrator/migrator.go @@ -2,25 +2,16 @@ package tablemigrator import ( "fmt" + "time" + "github.com/apache/incubator-pegasus/admin-cli/executor" "github.com/apache/incubator-pegasus/admin-cli/executor/toolkits" - "github.com/apache/incubator-pegasus/admin-cli/executor/toolkits/metaproxy" "github.com/apache/incubator-pegasus/admin-cli/util" "github.com/apache/incubator-pegasus/go-client/session" "github.com/pegasus-kv/collector/aggregate" - "time" ) -/** -1. check data version -2. create table duplication -3. check confirm decree if < 5k -4. set env config deny write request -5. check duplicate qps decree if == 0 -6. switch table env addrs -*/ - -func MigrateTable(client *executor.Client, table string, toCluster string) error { +func MigrateTable(client *executor.Client, table string, metaProxyZkAddrs string, metaProxyZkRoot string, targetCluster string, targetAddrs string) error { //1. check data version version, err := executor.QueryReplicaDataVersion(client, table) if err != nil { @@ -31,7 +22,7 @@ func MigrateTable(client *executor.Client, table string, toCluster string) error } //2. create data version - err = executor.AddDuplication(client, table, toCluster, false) + err = executor.AddDuplication(client, table, targetCluster, false) if err != nil { return err } @@ -59,12 +50,16 @@ func MigrateTable(client *executor.Client, table string, toCluster string) error if err != nil { return err } - err = checkDuplicateQPS(perfSessions, resp.AppID) + err = checkDuplicatingQPS(perfSessions, resp.AppID) if err != nil { return err } //6. switch table addrs in metaproxy - err = metaproxy.SwitchMetaAddrs(client, "", "", "", "") + if metaProxyZkRoot == "" { + toolkits.LogWarn("you don't specify enough meta proxy info, please manual-switch the table cluster!") + return nil + } + err = SwitchMetaAddrs(client, metaProxyZkAddrs, metaProxyZkRoot, table, targetAddrs) if err != nil { return err } @@ -92,10 +87,11 @@ func checkUnConfirmedDecree(perfSessions []*aggregate.PerfSession, threshold flo } } } + toolkits.LogDebug(fmt.Sprintf("all the node pending_mutations_count has less %f", threshold)) return nil } -func checkDuplicateQPS(perfSessions []*aggregate.PerfSession, tableID int32) error { +func checkDuplicatingQPS(perfSessions []*aggregate.PerfSession, tableID int32) error { completed := false counter := fmt.Sprintf("duplicate_qps@%d", tableID) for !completed { @@ -112,5 +108,6 @@ func checkDuplicateQPS(perfSessions []*aggregate.PerfSession, tableID int32) err } } } + toolkits.LogDebug("all the node has stop duplicate the pending wal") return nil } diff --git a/admin-cli/executor/toolkits/metaproxy/meta_proxy.go b/admin-cli/executor/toolkits/tablemigrator/switcher.go similarity index 61% rename from admin-cli/executor/toolkits/metaproxy/meta_proxy.go rename to admin-cli/executor/toolkits/tablemigrator/switcher.go index bc347807..c5dbf55c 100644 --- a/admin-cli/executor/toolkits/metaproxy/meta_proxy.go +++ b/admin-cli/executor/toolkits/tablemigrator/switcher.go @@ -1,72 +1,16 @@ -package metaproxy +package tablemigrator import ( "encoding/json" "fmt" - "github.com/apache/incubator-pegasus/admin-cli/executor" "os" + "strings" "time" + "github.com/apache/incubator-pegasus/admin-cli/executor" "github.com/go-zookeeper/zk" ) -func getTableAddrInMetaProxy(client *executor.Client, zkAddr string, zkRoot string, tableName string) error { - cluster, err := client.Meta.QueryClusterInfo() - if err != nil { - return err - } - - if zkAddr == "" { - zkAddr = cluster["zookeeper_hosts"] - } - zkConn, _, err := zk.Connect([]string{zkAddr}, time.Duration(1000*1000*1000)) - if err != nil { - return err - } - defer zkConn.Close() - - currentRemoteZKInfo, err := ReadZkData(zkConn, zkRoot, tableName) - if err != nil { - return err - } - // formats into JSON - outputBytes, _ := json.MarshalIndent(currentRemoteZKInfo, "", " ") - fmt.Fprintln(client, string(outputBytes)) - return nil -} - -func addTableAddrInMetaProxy(client *executor.Client, zkAddr string, zkRoot string, tableName string) error { - cluster, err := client.Meta.QueryClusterInfo() - if err != nil { - return err - } - - if zkAddr == "" { - zkAddr = cluster["zookeeper_hosts"] - } - zkConn, _, err := zk.Connect([]string{zkAddr}, time.Duration(1000*1000*1000)) - if err != nil { - return err - } - defer zkConn.Close() - - clusterName := cluster["cluster_name"] - clusterAddr := cluster["meta_servers"] - _, _, err = WriteZkData(zkConn, zkRoot, tableName, clusterName, clusterAddr) - if err != nil { - return err - } - // formats into JSON - tableInfo := MetaProxyTable{ - ClusterName: clusterName, - MetaAddrs: clusterAddr, - } - - outputBytes, _ := json.MarshalIndent(tableInfo, "", " ") - fmt.Fprintln(client, string(outputBytes)) - return nil -} - func SwitchMetaAddrs(client *executor.Client, zkAddr string, zkRoot string, tableName string, targetAddrs string) error { cluster, err := client.Meta.QueryClusterInfo() if err != nil { @@ -94,7 +38,8 @@ func SwitchMetaAddrs(client *executor.Client, zkAddr string, zkRoot string, tabl } originMeta := client.Meta - targetMeta := executor.NewClient(os.Stdout, []string{}).Meta + targetAddrList := strings.Split(targetAddrs, ",") + targetMeta := executor.NewClient(os.Stdout, targetAddrList).Meta env := map[string]string{ "replica.deny_client_request": "reconfig*all", } @@ -103,7 +48,7 @@ func SwitchMetaAddrs(client *executor.Client, zkAddr string, zkRoot string, tabl if err != nil { return err } - _, _, err = WriteZkData(zkConn, zkRoot, tableName, targetCluster["cluster_name"], targetAddrs) + _, updatedZkInfo, err := WriteZkData(zkConn, zkRoot, tableName, targetCluster["cluster_name"], targetAddrs) if err != nil { return err } @@ -112,6 +57,7 @@ func SwitchMetaAddrs(client *executor.Client, zkAddr string, zkRoot string, tabl if err != nil { return err } + fmt.Printf("%s has updated metaproxy addr from %v to %v, current table env is %v", tableName, currentRemoteZKInfo, updatedZkInfo, env) return nil } diff --git a/admin-cli/util/http_client.go b/admin-cli/util/http_client.go index 473c6fb7..3c456d53 100644 --- a/admin-cli/util/http_client.go +++ b/admin-cli/util/http_client.go @@ -3,9 +3,10 @@ package util import ( "context" "fmt" - "github.com/go-resty/resty/v2" "sync" "time" + + "github.com/go-resty/resty/v2" ) type Arguments struct { @@ -18,9 +19,9 @@ type Result struct { Err error } -type HttpRequest func(addr string, args Arguments) (string, error) +type HTTPRequestFunc func(addr string, args Arguments) (string, error) -func BatchCallHTTP(nodes []*PegasusNode, request HttpRequest, args Arguments) map[string]*Result { +func BatchCallHTTP(nodes []*PegasusNode, request HTTPRequestFunc, args Arguments) map[string]*Result { results := make(map[string]*Result) var mu sync.Mutex --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
