This is an automated email from the ASF dual-hosted git repository. jiashuo pushed a commit to branch add-table-migrator in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit 1dcfcbd2f67f16d9e65315ff643b2e8898998416 Author: jiashuo <[email protected]> AuthorDate: Mon Jun 13 18:46:30 2022 +0800 update --- .../executor/toolkits/metaproxy/meta_proxy.go | 164 +++++++++++++++++++++ .../executor/toolkits/tablemigrator/migrator.go | 109 +++++++++++++- admin-cli/go.mod | 1 + admin-cli/go.sum | 2 + 4 files changed, 274 insertions(+), 2 deletions(-) diff --git a/admin-cli/executor/toolkits/metaproxy/meta_proxy.go b/admin-cli/executor/toolkits/metaproxy/meta_proxy.go new file mode 100644 index 00000000..bc347807 --- /dev/null +++ b/admin-cli/executor/toolkits/metaproxy/meta_proxy.go @@ -0,0 +1,164 @@ +package metaproxy + +import ( + "encoding/json" + "fmt" + "github.com/apache/incubator-pegasus/admin-cli/executor" + "os" + "time" + + "github.com/go-zookeeper/zk" +) + +func getTableAddrInMetaProxy(client *executor.Client, zkAddr string, zkRoot string, tableName string) error { + cluster, err := client.Meta.QueryClusterInfo() + if err != nil { + return err + } + + if zkAddr == "" { + zkAddr = cluster["zookeeper_hosts"] + } + zkConn, _, err := zk.Connect([]string{zkAddr}, time.Duration(1000*1000*1000)) + if err != nil { + return err + } + defer zkConn.Close() + + currentRemoteZKInfo, err := ReadZkData(zkConn, zkRoot, tableName) + if err != nil { + return err + } + // formats into JSON + outputBytes, _ := json.MarshalIndent(currentRemoteZKInfo, "", " ") + fmt.Fprintln(client, string(outputBytes)) + return nil +} + +func addTableAddrInMetaProxy(client *executor.Client, zkAddr string, zkRoot string, tableName string) error { + cluster, err := client.Meta.QueryClusterInfo() + if err != nil { + return err + } + + if zkAddr == "" { + zkAddr = cluster["zookeeper_hosts"] + } + zkConn, _, err := zk.Connect([]string{zkAddr}, time.Duration(1000*1000*1000)) + if err != nil { + return err + } + defer zkConn.Close() + + clusterName := cluster["cluster_name"] + clusterAddr := cluster["meta_servers"] + _, _, err = WriteZkData(zkConn, zkRoot, tableName, clusterName, clusterAddr) + if err != nil { + return err + } + // formats into JSON + tableInfo := MetaProxyTable{ + ClusterName: clusterName, + MetaAddrs: clusterAddr, + } + + outputBytes, _ := json.MarshalIndent(tableInfo, "", " ") + fmt.Fprintln(client, string(outputBytes)) + return nil +} + +func SwitchMetaAddrs(client *executor.Client, zkAddr string, zkRoot string, tableName string, targetAddrs string) error { + cluster, err := client.Meta.QueryClusterInfo() + if err != nil { + return err + } + + if zkAddr == "" { + zkAddr = cluster["zookeeper_hosts"] + } + zkConn, _, err := zk.Connect([]string{zkAddr}, time.Duration(1000*1000*1000)) + if err != nil { + return err + } + defer zkConn.Close() + + currentRemoteZKInfo, err := ReadZkData(zkConn, zkRoot, tableName) + if err != nil { + return err + } + + currentLocalCluster := cluster["cluster_name"] + if currentRemoteZKInfo.ClusterName != currentLocalCluster { + return fmt.Errorf("current remote table is not `current local cluster`, remote vs expect= %s : %s", + currentRemoteZKInfo.ClusterName, currentLocalCluster) + } + + originMeta := client.Meta + targetMeta := executor.NewClient(os.Stdout, []string{}).Meta + env := map[string]string{ + "replica.deny_client_request": "reconfig*all", + } + + targetCluster, err := targetMeta.QueryClusterInfo() + if err != nil { + return err + } + _, _, err = WriteZkData(zkConn, zkRoot, tableName, targetCluster["cluster_name"], targetAddrs) + if err != nil { + return err + } + + err = originMeta.UpdateAppEnvs(tableName, env) + if err != nil { + return err + } + return nil +} + +type MetaProxyTable struct { + ClusterName string `json:"cluster_name"` + MetaAddrs string `json:"meta_addrs"` +} + +func ReadZkData(zkConn *zk.Conn, root string, table string) (*MetaProxyTable, error) { + tablePath := fmt.Sprintf("%s/%s", root, table) + exist, _, _ := zkConn.Exists(tablePath) + if !exist { + return nil, fmt.Errorf("can't find the zk path: %s", tablePath) + } + + data, _, err := zkConn.Get(tablePath) + if err != nil { + return nil, err + } + + metaProxyTable := MetaProxyTable{} + err = json.Unmarshal(data, &metaProxyTable) + if err != nil { + return nil, err + } + return &metaProxyTable, nil +} + +func WriteZkData(zkConn *zk.Conn, root string, table string, cluster string, addrs string) (string, string, error) { + zkData := encodeToZkNodeData(cluster, addrs) + tablePath := fmt.Sprintf("%s/%s", root, table) + exist, stat, _ := zkConn.Exists(tablePath) + if !exist { + _, err := zkConn.Create(tablePath, zkData, 0, zk.WorldACL(zk.PermAll)) + if err != nil { + return "", "", err + } + } + _, err := zkConn.Set(tablePath, zkData, stat.Version) + if err != nil { + return "", "", err + } + + return tablePath, string(zkData), nil +} + +func encodeToZkNodeData(cluster string, addr string) []byte { + data := fmt.Sprintf("{\"cluster_name\": \"%s\", \"meta_addrs\": \"%s\"}", cluster, addr) + return []byte(data) +} diff --git a/admin-cli/executor/toolkits/tablemigrator/migrator.go b/admin-cli/executor/toolkits/tablemigrator/migrator.go index e18c3744..e6b0f7a3 100644 --- a/admin-cli/executor/toolkits/tablemigrator/migrator.go +++ b/admin-cli/executor/toolkits/tablemigrator/migrator.go @@ -1,11 +1,116 @@ package tablemigrator +import ( + "fmt" + "github.com/apache/incubator-pegasus/admin-cli/executor" + "github.com/apache/incubator-pegasus/admin-cli/executor/toolkits" + "github.com/apache/incubator-pegasus/admin-cli/executor/toolkits/metaproxy" + "github.com/apache/incubator-pegasus/admin-cli/util" + "github.com/apache/incubator-pegasus/go-client/session" + "github.com/pegasus-kv/collector/aggregate" + "time" +) + /** 1. check data version 2. create table duplication 3. check confirm decree if < 5k 4. set env config deny write request -5. check confirm decree if == 0 +5. check duplicate qps decree if == 0 6. switch table env addrs -7. set env config deny and re-config */ + +func MigrateTable(client *executor.Client, table string, toCluster string) error { + //1. check data version + version, err := executor.QueryReplicaDataVersion(client, table) + if err != nil { + return nil + } + if version.DataVersion != "1" { + return fmt.Errorf("not support data version = 0 to migrate by duplication") + } + + //2. create data version + err = executor.AddDuplication(client, table, toCluster, false) + if err != nil { + return err + } + + //3. check un-confirm decree is less 5k + nodes := client.Nodes.GetAllNodes(session.NodeTypeReplica) + var perfSessions []*aggregate.PerfSession + for _, n := range nodes { + perfSessions = append(perfSessions, client.Nodes.GetPerfSession(n.TCPAddr(), session.NodeTypeReplica)) + } + err = checkUnConfirmedDecree(perfSessions, 5000) + if err != nil { + return err + } + //4. set env config deny write request + var envs = map[string]string{ + "replica.deny_client_request": "timeout*write", + } + err = client.Meta.UpdateAppEnvs(table, envs) + if err != nil { + return err + } + //5. check duplicate qps if equal 0 + resp, err := client.Meta.QueryConfig(table) + if err != nil { + return err + } + err = checkDuplicateQPS(perfSessions, resp.AppID) + if err != nil { + return err + } + //6. switch table addrs in metaproxy + err = metaproxy.SwitchMetaAddrs(client, "", "", "", "") + if err != nil { + return err + } + return nil +} + +func checkUnConfirmedDecree(perfSessions []*aggregate.PerfSession, threshold float64) error { + completed := false + for !completed { + completed = true + time.Sleep(10 * time.Second) + for _, perf := range perfSessions { + stats, err := perf.GetPerfCounters("pending_mutations_count") + if err != nil { + return err + } + if len(stats) != 1 { + return fmt.Errorf("get pending_mutations_count perfcounter size must be 1, but now is %d", len(stats)) + } + + if stats[0].Value > threshold { + completed = false + toolkits.LogDebug(fmt.Sprintf("%s has pending_mutations_count %f", perf.Address, stats[0].Value)) + break + } + } + } + return nil +} + +func checkDuplicateQPS(perfSessions []*aggregate.PerfSession, tableID int32) error { + completed := false + counter := fmt.Sprintf("duplicate_qps@%d", tableID) + for !completed { + completed = true + time.Sleep(10 * time.Second) + for _, perf := range perfSessions { + stats := util.GetPartitionStat(perf, counter) + for gpid, qps := range stats { + if qps > 0 { + completed = false + toolkits.LogDebug(fmt.Sprintf("%s[%s] still sending pending mutation %f", perf.Address, gpid, qps)) + break + } + } + } + } + return nil +} diff --git a/admin-cli/go.mod b/admin-cli/go.mod index 99f0bbdb..491ba295 100644 --- a/admin-cli/go.mod +++ b/admin-cli/go.mod @@ -26,6 +26,7 @@ require ( github.com/dustin/go-humanize v1.0.0 github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/go-resty/resty/v2 v2.6.0 + github.com/go-zookeeper/zk v1.0.2 github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/kr/pretty v0.2.1 // indirect github.com/magiconair/properties v1.8.4 // indirect diff --git a/admin-cli/go.sum b/admin-cli/go.sum index 4d96344b..db1b7463 100644 --- a/admin-cli/go.sum +++ b/admin-cli/go.sum @@ -159,6 +159,8 @@ github.com/go-resty/resty/v2 v2.6.0 h1:joIR5PNLM2EFqqESUjCMGXrWmXNHEU9CEiK813oKY github.com/go-resty/resty/v2 v2.6.0/go.mod h1:PwvJS6hvaPkjtjNg9ph+VrSD92bi5Zq73w/BIH7cC3Q= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-zookeeper/zk v1.0.2 h1:4mx0EYENAdX/B/rbunjlt5+4RTA/a9SMHBRuSKdGxPM= +github.com/go-zookeeper/zk v1.0.2/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw= github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
