This is an automated email from the ASF dual-hosted git repository.

jiashuo pushed a commit to branch add-table-migrator
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git

commit 1dcfcbd2f67f16d9e65315ff643b2e8898998416
Author: jiashuo <[email protected]>
AuthorDate: Mon Jun 13 18:46:30 2022 +0800

    update
---
 .../executor/toolkits/metaproxy/meta_proxy.go      | 164 +++++++++++++++++++++
 .../executor/toolkits/tablemigrator/migrator.go    | 109 +++++++++++++-
 admin-cli/go.mod                                   |   1 +
 admin-cli/go.sum                                   |   2 +
 4 files changed, 274 insertions(+), 2 deletions(-)

diff --git a/admin-cli/executor/toolkits/metaproxy/meta_proxy.go 
b/admin-cli/executor/toolkits/metaproxy/meta_proxy.go
new file mode 100644
index 00000000..bc347807
--- /dev/null
+++ b/admin-cli/executor/toolkits/metaproxy/meta_proxy.go
@@ -0,0 +1,164 @@
+package metaproxy
+
+import (
+       "encoding/json"
+       "fmt"
+       "github.com/apache/incubator-pegasus/admin-cli/executor"
+       "os"
+       "time"
+
+       "github.com/go-zookeeper/zk"
+)
+
+func getTableAddrInMetaProxy(client *executor.Client, zkAddr string, zkRoot 
string, tableName string) error {
+       cluster, err := client.Meta.QueryClusterInfo()
+       if err != nil {
+               return err
+       }
+
+       if zkAddr == "" {
+               zkAddr = cluster["zookeeper_hosts"]
+       }
+       zkConn, _, err := zk.Connect([]string{zkAddr}, 
time.Duration(1000*1000*1000))
+       if err != nil {
+               return err
+       }
+       defer zkConn.Close()
+
+       currentRemoteZKInfo, err := ReadZkData(zkConn, zkRoot, tableName)
+       if err != nil {
+               return err
+       }
+       // formats into JSON
+       outputBytes, _ := json.MarshalIndent(currentRemoteZKInfo, "", "  ")
+       fmt.Fprintln(client, string(outputBytes))
+       return nil
+}
+
+func addTableAddrInMetaProxy(client *executor.Client, zkAddr string, zkRoot 
string, tableName string) error {
+       cluster, err := client.Meta.QueryClusterInfo()
+       if err != nil {
+               return err
+       }
+
+       if zkAddr == "" {
+               zkAddr = cluster["zookeeper_hosts"]
+       }
+       zkConn, _, err := zk.Connect([]string{zkAddr}, 
time.Duration(1000*1000*1000))
+       if err != nil {
+               return err
+       }
+       defer zkConn.Close()
+
+       clusterName := cluster["cluster_name"]
+       clusterAddr := cluster["meta_servers"]
+       _, _, err = WriteZkData(zkConn, zkRoot, tableName, clusterName, 
clusterAddr)
+       if err != nil {
+               return err
+       }
+       // formats into JSON
+       tableInfo := MetaProxyTable{
+               ClusterName: clusterName,
+               MetaAddrs:   clusterAddr,
+       }
+
+       outputBytes, _ := json.MarshalIndent(tableInfo, "", "  ")
+       fmt.Fprintln(client, string(outputBytes))
+       return nil
+}
+
+func SwitchMetaAddrs(client *executor.Client, zkAddr string, zkRoot string, 
tableName string, targetAddrs string) error {
+       cluster, err := client.Meta.QueryClusterInfo()
+       if err != nil {
+               return err
+       }
+
+       if zkAddr == "" {
+               zkAddr = cluster["zookeeper_hosts"]
+       }
+       zkConn, _, err := zk.Connect([]string{zkAddr}, 
time.Duration(1000*1000*1000))
+       if err != nil {
+               return err
+       }
+       defer zkConn.Close()
+
+       currentRemoteZKInfo, err := ReadZkData(zkConn, zkRoot, tableName)
+       if err != nil {
+               return err
+       }
+
+       currentLocalCluster := cluster["cluster_name"]
+       if currentRemoteZKInfo.ClusterName != currentLocalCluster {
+               return fmt.Errorf("current remote table is not `current local 
cluster`, remote vs expect= %s : %s",
+                       currentRemoteZKInfo.ClusterName, currentLocalCluster)
+       }
+
+       originMeta := client.Meta
+       targetMeta := executor.NewClient(os.Stdout, []string{}).Meta
+       env := map[string]string{
+               "replica.deny_client_request": "reconfig*all",
+       }
+
+       targetCluster, err := targetMeta.QueryClusterInfo()
+       if err != nil {
+               return err
+       }
+       _, _, err = WriteZkData(zkConn, zkRoot, tableName, 
targetCluster["cluster_name"], targetAddrs)
+       if err != nil {
+               return err
+       }
+
+       err = originMeta.UpdateAppEnvs(tableName, env)
+       if err != nil {
+               return err
+       }
+       return nil
+}
+
+type MetaProxyTable struct {
+       ClusterName string `json:"cluster_name"`
+       MetaAddrs   string `json:"meta_addrs"`
+}
+
+func ReadZkData(zkConn *zk.Conn, root string, table string) (*MetaProxyTable, 
error) {
+       tablePath := fmt.Sprintf("%s/%s", root, table)
+       exist, _, _ := zkConn.Exists(tablePath)
+       if !exist {
+               return nil, fmt.Errorf("can't find the zk path: %s", tablePath)
+       }
+
+       data, _, err := zkConn.Get(tablePath)
+       if err != nil {
+               return nil, err
+       }
+
+       metaProxyTable := MetaProxyTable{}
+       err = json.Unmarshal(data, &metaProxyTable)
+       if err != nil {
+               return nil, err
+       }
+       return &metaProxyTable, nil
+}
+
+func WriteZkData(zkConn *zk.Conn, root string, table string, cluster string, 
addrs string) (string, string, error) {
+       zkData := encodeToZkNodeData(cluster, addrs)
+       tablePath := fmt.Sprintf("%s/%s", root, table)
+       exist, stat, _ := zkConn.Exists(tablePath)
+       if !exist {
+               _, err := zkConn.Create(tablePath, zkData, 0, 
zk.WorldACL(zk.PermAll))
+               if err != nil {
+                       return "", "", err
+               }
+       }
+       _, err := zkConn.Set(tablePath, zkData, stat.Version)
+       if err != nil {
+               return "", "", err
+       }
+
+       return tablePath, string(zkData), nil
+}
+
+func encodeToZkNodeData(cluster string, addr string) []byte {
+       data := fmt.Sprintf("{\"cluster_name\": \"%s\", \"meta_addrs\": 
\"%s\"}", cluster, addr)
+       return []byte(data)
+}
diff --git a/admin-cli/executor/toolkits/tablemigrator/migrator.go 
b/admin-cli/executor/toolkits/tablemigrator/migrator.go
index e18c3744..e6b0f7a3 100644
--- a/admin-cli/executor/toolkits/tablemigrator/migrator.go
+++ b/admin-cli/executor/toolkits/tablemigrator/migrator.go
@@ -1,11 +1,116 @@
 package tablemigrator
 
+import (
+       "fmt"
+       "github.com/apache/incubator-pegasus/admin-cli/executor"
+       "github.com/apache/incubator-pegasus/admin-cli/executor/toolkits"
+       
"github.com/apache/incubator-pegasus/admin-cli/executor/toolkits/metaproxy"
+       "github.com/apache/incubator-pegasus/admin-cli/util"
+       "github.com/apache/incubator-pegasus/go-client/session"
+       "github.com/pegasus-kv/collector/aggregate"
+       "time"
+)
+
 /**
 1. check data version
 2. create table duplication
 3. check confirm decree if < 5k
 4. set env config deny write request
-5. check confirm decree if == 0
+5. check duplicate qps decree if == 0
 6. switch table env addrs
-7. set env config deny and re-config
 */
+
+func MigrateTable(client *executor.Client, table string, toCluster string) 
error {
+       //1. check data version
+       version, err := executor.QueryReplicaDataVersion(client, table)
+       if err != nil {
+               return nil
+       }
+       if version.DataVersion != "1" {
+               return fmt.Errorf("not support data version = 0 to migrate by 
duplication")
+       }
+
+       //2. create data version
+       err = executor.AddDuplication(client, table, toCluster, false)
+       if err != nil {
+               return err
+       }
+
+       //3. check un-confirm decree is less 5k
+       nodes := client.Nodes.GetAllNodes(session.NodeTypeReplica)
+       var perfSessions []*aggregate.PerfSession
+       for _, n := range nodes {
+               perfSessions = append(perfSessions, 
client.Nodes.GetPerfSession(n.TCPAddr(), session.NodeTypeReplica))
+       }
+       err = checkUnConfirmedDecree(perfSessions, 5000)
+       if err != nil {
+               return err
+       }
+       //4. set env config deny write request
+       var envs = map[string]string{
+               "replica.deny_client_request": "timeout*write",
+       }
+       err = client.Meta.UpdateAppEnvs(table, envs)
+       if err != nil {
+               return err
+       }
+       //5. check duplicate qps if equal 0
+       resp, err := client.Meta.QueryConfig(table)
+       if err != nil {
+               return err
+       }
+       err = checkDuplicateQPS(perfSessions, resp.AppID)
+       if err != nil {
+               return err
+       }
+       //6. switch table addrs in metaproxy
+       err = metaproxy.SwitchMetaAddrs(client, "", "", "", "")
+       if err != nil {
+               return err
+       }
+       return nil
+}
+
+func checkUnConfirmedDecree(perfSessions []*aggregate.PerfSession, threshold 
float64) error {
+       completed := false
+       for !completed {
+               completed = true
+               time.Sleep(10 * time.Second)
+               for _, perf := range perfSessions {
+                       stats, err := 
perf.GetPerfCounters("pending_mutations_count")
+                       if err != nil {
+                               return err
+                       }
+                       if len(stats) != 1 {
+                               return fmt.Errorf("get pending_mutations_count 
perfcounter size must be 1, but now is %d", len(stats))
+                       }
+
+                       if stats[0].Value > threshold {
+                               completed = false
+                               toolkits.LogDebug(fmt.Sprintf("%s has 
pending_mutations_count %f", perf.Address, stats[0].Value))
+                               break
+                       }
+               }
+       }
+       return nil
+}
+
+func checkDuplicateQPS(perfSessions []*aggregate.PerfSession, tableID int32) 
error {
+       completed := false
+       counter := fmt.Sprintf("duplicate_qps@%d", tableID)
+       for !completed {
+               completed = true
+               time.Sleep(10 * time.Second)
+               for _, perf := range perfSessions {
+                       stats := util.GetPartitionStat(perf, counter)
+                       for gpid, qps := range stats {
+                               if qps > 0 {
+                                       completed = false
+                                       toolkits.LogDebug(fmt.Sprintf("%s[%s] 
still sending pending mutation %f", perf.Address, gpid, qps))
+                                       break
+                               }
+                       }
+               }
+       }
+       return nil
+}
diff --git a/admin-cli/go.mod b/admin-cli/go.mod
index 99f0bbdb..491ba295 100644
--- a/admin-cli/go.mod
+++ b/admin-cli/go.mod
@@ -26,6 +26,7 @@ require (
        github.com/dustin/go-humanize v1.0.0
        github.com/fsnotify/fsnotify v1.4.9 // indirect
        github.com/go-resty/resty/v2 v2.6.0
+       github.com/go-zookeeper/zk v1.0.2
        github.com/hashicorp/go-multierror v1.1.1 // indirect
        github.com/kr/pretty v0.2.1 // indirect
        github.com/magiconair/properties v1.8.4 // indirect
diff --git a/admin-cli/go.sum b/admin-cli/go.sum
index 4d96344b..db1b7463 100644
--- a/admin-cli/go.sum
+++ b/admin-cli/go.sum
@@ -159,6 +159,8 @@ github.com/go-resty/resty/v2 v2.6.0 
h1:joIR5PNLM2EFqqESUjCMGXrWmXNHEU9CEiK813oKY
 github.com/go-resty/resty/v2 v2.6.0/go.mod 
h1:PwvJS6hvaPkjtjNg9ph+VrSD92bi5Zq73w/BIH7cC3Q=
 github.com/go-sql-driver/mysql v1.4.0/go.mod 
h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
 github.com/go-stack/stack v1.8.0/go.mod 
h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
+github.com/go-zookeeper/zk v1.0.2 
h1:4mx0EYENAdX/B/rbunjlt5+4RTA/a9SMHBRuSKdGxPM=
+github.com/go-zookeeper/zk v1.0.2/go.mod 
h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
 github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod 
h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=
 github.com/gobwas/pool v0.2.0/go.mod 
h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
 github.com/gobwas/ws v1.0.2/go.mod 
h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to