This is an automated email from the ASF dual-hosted git repository. jiashuo pushed a commit to branch add-table-migrator in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit 466d46541fb0b20f16e1fa7acc771b317fed3ed9 Author: jiashuo <[email protected]> AuthorDate: Tue Jun 14 16:02:12 2022 +0800 update --- admin-cli/cmd/table_migrator.go | 12 +++++------ admin-cli/executor/data_version.go | 25 ++++++++++++---------- .../executor/toolkits/tablemigrator/migrator.go | 8 +++---- 3 files changed, 24 insertions(+), 21 deletions(-) diff --git a/admin-cli/cmd/table_migrator.go b/admin-cli/cmd/table_migrator.go index 0b4d13cf..a32910f3 100644 --- a/admin-cli/cmd/table_migrator.go +++ b/admin-cli/cmd/table_migrator.go @@ -10,19 +10,19 @@ func init() { shell.AddCommand(&grumble.Command{ Name: "table-migrator", Help: "migrate table from current cluster to another via table duplication and metaproxy", - Run: func(c *grumble.Context) error { - return tablemigrator.MigrateTable(pegasusClient, c.Flags.String("table"), - c.Flags.String("node"), c.Flags.String("root"), - c.Flags.String("cluster"), c.Flags.String("meta")) - }, Flags: func(f *grumble.Flags) { f.String("t", "table", "", "table name") - f.String("n", "node", "", "zk node: addrs:port, default equal with peagsus "+ + f.String("n", "node", "", "zk node, addrs:port, default equal with peagsus "+ "cluster zk addrs, you can use `cluster-info` to show it") f.String("r", "root", "", "zk root path. the tool will update table addrs in "+ "the path of meatproxy, if you don't specify it, that is means user need manual-switch the table addrs") f.String("c", "cluster", "", "target cluster name") f.String("m", "meta", "", "target meta list") }, + Run: func(c *grumble.Context) error { + return tablemigrator.MigrateTable(pegasusClient, c.Flags.String("table"), + c.Flags.String("node"), c.Flags.String("root"), + c.Flags.String("cluster"), c.Flags.String("meta")) + }, }) } diff --git a/admin-cli/executor/data_version.go b/admin-cli/executor/data_version.go index 22138093..bce23beb 100644 --- a/admin-cli/executor/data_version.go +++ b/admin-cli/executor/data_version.go @@ -3,6 +3,7 @@ package executor import ( "encoding/json" "fmt" + "strconv" "github.com/apache/incubator-pegasus/admin-cli/util" "github.com/apache/incubator-pegasus/go-client/session" @@ -33,32 +34,34 @@ func QueryReplicaDataVersion(client *Client, table string) (*TableDataVersion, e args := util.Arguments{ Name: "app_id", - Value: string(resp.AppID), + Value: strconv.Itoa(int(resp.AppID)), } results := util.BatchCallHTTP(nodes, getTableDataVersion, args) - var finalVersion string - var version TableDataVersion + var finalVersion TableDataVersion + versions := make(map[string]TableDataVersion) for _, result := range results { if result.Err != nil { return nil, result.Err } - err := json.Unmarshal([]byte(result.Resp), &version) + err := json.Unmarshal([]byte(result.Resp), &versions) if err != nil { return nil, err } - if finalVersion == "" { - finalVersion = version.DataVersion - } else { - if version.DataVersion == finalVersion { - continue + for _, version := range versions { + if finalVersion.DataVersion == "" { + finalVersion = version } else { - return nil, fmt.Errorf("replica versions are not consistent") + if version.DataVersion == finalVersion.DataVersion { + continue + } else { + return nil, fmt.Errorf("replica versions are not consistent") + } } } } - return &version, nil + return &finalVersion, nil } func getTableDataVersion(addr string, args util.Arguments) (string, error) { diff --git a/admin-cli/executor/toolkits/tablemigrator/migrator.go b/admin-cli/executor/toolkits/tablemigrator/migrator.go index 8a3d2a42..33e39fbe 100644 --- a/admin-cli/executor/toolkits/tablemigrator/migrator.go +++ b/admin-cli/executor/toolkits/tablemigrator/migrator.go @@ -15,19 +15,19 @@ func MigrateTable(client *executor.Client, table string, metaProxyZkAddrs string //1. check data version version, err := executor.QueryReplicaDataVersion(client, table) if err != nil { - return nil + return err } if version.DataVersion != "1" { - return fmt.Errorf("not support data version = 0 to migrate by duplication") + return fmt.Errorf("not support migrate table with data_version = %s by duplication", version.DataVersion) } - //2. create data version + //2. create table duplication err = executor.AddDuplication(client, table, targetCluster, false) if err != nil { return err } - //3. check un-confirm decree is less 5k + //3. check un-confirm decree if less 5k nodes := client.Nodes.GetAllNodes(session.NodeTypeReplica) var perfSessions []*aggregate.PerfSession for _, n := range nodes { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
