This is an automated email from the ASF dual-hosted git repository. jiashuo pushed a commit to branch add-table-migrator in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit bacbd7468073e293655105934ac78d10179d392e Author: jiashuo <[email protected]> AuthorDate: Thu Jun 16 11:41:26 2022 +0800 update postion --- admin-cli/executor/toolkits/tablemigrator/migrator.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/admin-cli/executor/toolkits/tablemigrator/migrator.go b/admin-cli/executor/toolkits/tablemigrator/migrator.go index 2954ba4e..da4fe650 100644 --- a/admin-cli/executor/toolkits/tablemigrator/migrator.go +++ b/admin-cli/executor/toolkits/tablemigrator/migrator.go @@ -32,7 +32,7 @@ func MigrateTable(client *executor.Client, table string, metaProxyZkAddrs string //3. check un-confirm decree if less 5k toolkits.LogInfo("check un-confirm decree if less 5k") nodes := client.Nodes.GetAllNodes(session.NodeTypeReplica) - var perfSessions []*aggregate.PerfSession + perfSessions := make(map[string]*aggregate.PerfSession) for _, n := range nodes { if n.Session() == nil { return fmt.Errorf("init node failed = %s", n.TCPAddr()) @@ -44,7 +44,7 @@ func MigrateTable(client *executor.Client, table string, metaProxyZkAddrs string if perf.NodeSession == nil { return fmt.Errorf("session err, node=%s", n.TCPAddr()) } - perfSessions = append(perfSessions, perf) + perfSessions[n.CombinedAddr()] = perf } err = checkUnConfirmedDecree(perfSessions, 5000) if err != nil { @@ -82,12 +82,12 @@ func MigrateTable(client *executor.Client, table string, metaProxyZkAddrs string return nil } -func checkUnConfirmedDecree(perfSessions []*aggregate.PerfSession, threshold float64) error { +func checkUnConfirmedDecree(perfSessions map[string]*aggregate.PerfSession, threshold float64) error { completed := false for !completed { completed = true time.Sleep(10 * time.Second) - for _, perf := range perfSessions { + for addr, perf := range perfSessions { stats, err := perf.GetPerfCounters("pending_mutations_count") if err != nil { return err @@ -98,7 +98,7 @@ func checkUnConfirmedDecree(perfSessions []*aggregate.PerfSession, threshold flo if stats[0].Value > threshold { completed = false - toolkits.LogInfo(fmt.Sprintf("%s has pending_mutations_count %f", perf.Address, stats[0].Value)) + toolkits.LogInfo(fmt.Sprintf("%s has pending_mutations_count %f", addr, stats[0].Value)) break } } @@ -108,18 +108,18 @@ func checkUnConfirmedDecree(perfSessions []*aggregate.PerfSession, threshold flo return nil } -func checkDuplicatingQPS(perfSessions []*aggregate.PerfSession, tableID int32) error { +func checkDuplicatingQPS(perfSessions map[string]*aggregate.PerfSession, tableID int32) error { completed := false counter := fmt.Sprintf("duplicate_qps@%d", tableID) for !completed { completed = true time.Sleep(10 * time.Second) - for _, perf := range perfSessions { + for addr, perf := range perfSessions { stats := util.GetPartitionStat(perf, counter) for gpid, qps := range stats { if qps > 0 { completed = false - toolkits.LogInfo(fmt.Sprintf("%s[%s] still sending pending mutation %f", perf.Address, gpid, qps)) + toolkits.LogInfo(fmt.Sprintf("%s[%s] still sending pending mutation %f", addr, gpid, qps)) break } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
