This is an automated email from the ASF dual-hosted git repository. jiashuo pushed a commit to branch add-table-migrator in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit 20a34edcae0d19d987f35009a5d0a331ae5c79b4 Author: jiashuo <[email protected]> AuthorDate: Thu Jun 16 12:06:23 2022 +0800 update postion --- admin-cli/cmd/table_migrator.go | 3 ++- admin-cli/executor/toolkits/tablemigrator/migrator.go | 15 ++++++++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/admin-cli/cmd/table_migrator.go b/admin-cli/cmd/table_migrator.go index a32910f3..2821cc9e 100644 --- a/admin-cli/cmd/table_migrator.go +++ b/admin-cli/cmd/table_migrator.go @@ -18,11 +18,12 @@ func init() { "the path of meatproxy, if you don't specify it, that is means user need manual-switch the table addrs") f.String("c", "cluster", "", "target cluster name") f.String("m", "meta", "", "target meta list") + f.Float64("p", "threshold", 100000, "pending mutation throshold when server will reject all write request") }, Run: func(c *grumble.Context) error { return tablemigrator.MigrateTable(pegasusClient, c.Flags.String("table"), c.Flags.String("node"), c.Flags.String("root"), - c.Flags.String("cluster"), c.Flags.String("meta")) + c.Flags.String("cluster"), c.Flags.String("meta"), c.Flags.Float64("threshold")) }, }) } diff --git a/admin-cli/executor/toolkits/tablemigrator/migrator.go b/admin-cli/executor/toolkits/tablemigrator/migrator.go index da4fe650..b76df808 100644 --- a/admin-cli/executor/toolkits/tablemigrator/migrator.go +++ b/admin-cli/executor/toolkits/tablemigrator/migrator.go @@ -11,7 +11,12 @@ import ( "github.com/pegasus-kv/collector/aggregate" ) -func MigrateTable(client *executor.Client, table string, metaProxyZkAddrs string, metaProxyZkRoot string, targetCluster string, targetAddrs string) error { +var pendingMutationThreshold = 100000.0 + +func MigrateTable(client *executor.Client, table string, metaProxyZkAddrs string, metaProxyZkRoot string, targetCluster string, targetAddrs string, threshold float64) error { + pendingMutationThreshold = threshold + toolkits.LogInfo(fmt.Sprintf("set pendingMutationThreshold = %f means if the pending less the value will "+ + "reject all write and ready to switch cluster", pendingMutationThreshold)) //1. check data version toolkits.LogInfo("check data version") version, err := executor.QueryReplicaDataVersion(client, table) @@ -46,7 +51,7 @@ func MigrateTable(client *executor.Client, table string, metaProxyZkAddrs string } perfSessions[n.CombinedAddr()] = perf } - err = checkUnConfirmedDecree(perfSessions, 5000) + err = checkUnConfirmedDecree(perfSessions) if err != nil { return err } @@ -82,7 +87,7 @@ func MigrateTable(client *executor.Client, table string, metaProxyZkAddrs string return nil } -func checkUnConfirmedDecree(perfSessions map[string]*aggregate.PerfSession, threshold float64) error { +func checkUnConfirmedDecree(perfSessions map[string]*aggregate.PerfSession) error { completed := false for !completed { completed = true @@ -96,14 +101,14 @@ func checkUnConfirmedDecree(perfSessions map[string]*aggregate.PerfSession, thre return fmt.Errorf("get pending_mutations_count perfcounter size must be 1, but now is %d", len(stats)) } - if stats[0].Value > threshold { + if stats[0].Value > pendingMutationThreshold { completed = false toolkits.LogInfo(fmt.Sprintf("%s has pending_mutations_count %f", addr, stats[0].Value)) break } } } - toolkits.LogInfo(fmt.Sprintf("all the node pending_mutations_count has less %f", threshold)) + toolkits.LogInfo(fmt.Sprintf("all the node pending_mutations_count has less %f", pendingMutationThreshold)) time.Sleep(10 * time.Second) return nil } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
