This is an automated email from the ASF dual-hosted git repository.

jiashuo pushed a commit to branch add-table-migrator
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git

commit 20a34edcae0d19d987f35009a5d0a331ae5c79b4
Author: jiashuo <[email protected]>
AuthorDate: Thu Jun 16 12:06:23 2022 +0800

    update postion
---
 admin-cli/cmd/table_migrator.go                       |  3 ++-
 admin-cli/executor/toolkits/tablemigrator/migrator.go | 15 ++++++++++-----
 2 files changed, 12 insertions(+), 6 deletions(-)

diff --git a/admin-cli/cmd/table_migrator.go b/admin-cli/cmd/table_migrator.go
index a32910f3..2821cc9e 100644
--- a/admin-cli/cmd/table_migrator.go
+++ b/admin-cli/cmd/table_migrator.go
@@ -18,11 +18,12 @@ func init() {
                                "the path of meatproxy, if you don't specify 
it, that is means user need manual-switch the table addrs")
                        f.String("c", "cluster", "", "target cluster name")
                        f.String("m", "meta", "", "target meta list")
+                       f.Float64("p", "threshold", 100000, "pending mutation 
throshold when server will reject all write request")
                },
                Run: func(c *grumble.Context) error {
                        return tablemigrator.MigrateTable(pegasusClient, 
c.Flags.String("table"),
                                c.Flags.String("node"), c.Flags.String("root"),
-                               c.Flags.String("cluster"), 
c.Flags.String("meta"))
+                               c.Flags.String("cluster"), 
c.Flags.String("meta"), c.Flags.Float64("threshold"))
                },
        })
 }
diff --git a/admin-cli/executor/toolkits/tablemigrator/migrator.go 
b/admin-cli/executor/toolkits/tablemigrator/migrator.go
index da4fe650..b76df808 100644
--- a/admin-cli/executor/toolkits/tablemigrator/migrator.go
+++ b/admin-cli/executor/toolkits/tablemigrator/migrator.go
@@ -11,7 +11,12 @@ import (
        "github.com/pegasus-kv/collector/aggregate"
 )
 
-func MigrateTable(client *executor.Client, table string, metaProxyZkAddrs 
string, metaProxyZkRoot string, targetCluster string, targetAddrs string) error 
{
+var pendingMutationThreshold = 100000.0
+
+func MigrateTable(client *executor.Client, table string, metaProxyZkAddrs 
string, metaProxyZkRoot string, targetCluster string, targetAddrs string, 
threshold float64) error {
+       pendingMutationThreshold = threshold
+       toolkits.LogInfo(fmt.Sprintf("set pendingMutationThreshold = %f means 
if the pending less the value will "+
+               "reject all write and ready to switch cluster", 
pendingMutationThreshold))
        //1. check data version
        toolkits.LogInfo("check data version")
        version, err := executor.QueryReplicaDataVersion(client, table)
@@ -46,7 +51,7 @@ func MigrateTable(client *executor.Client, table string, 
metaProxyZkAddrs string
                }
                perfSessions[n.CombinedAddr()] = perf
        }
-       err = checkUnConfirmedDecree(perfSessions, 5000)
+       err = checkUnConfirmedDecree(perfSessions)
        if err != nil {
                return err
        }
@@ -82,7 +87,7 @@ func MigrateTable(client *executor.Client, table string, 
metaProxyZkAddrs string
        return nil
 }
 
-func checkUnConfirmedDecree(perfSessions map[string]*aggregate.PerfSession, 
threshold float64) error {
+func checkUnConfirmedDecree(perfSessions map[string]*aggregate.PerfSession) 
error {
        completed := false
        for !completed {
                completed = true
@@ -96,14 +101,14 @@ func checkUnConfirmedDecree(perfSessions 
map[string]*aggregate.PerfSession, thre
                                return fmt.Errorf("get pending_mutations_count 
perfcounter size must be 1, but now is %d", len(stats))
                        }
 
-                       if stats[0].Value > threshold {
+                       if stats[0].Value > pendingMutationThreshold {
                                completed = false
                                toolkits.LogInfo(fmt.Sprintf("%s has 
pending_mutations_count %f", addr, stats[0].Value))
                                break
                        }
                }
        }
-       toolkits.LogInfo(fmt.Sprintf("all the node pending_mutations_count has 
less %f", threshold))
+       toolkits.LogInfo(fmt.Sprintf("all the node pending_mutations_count has 
less %f", pendingMutationThreshold))
        time.Sleep(10 * time.Second)
        return nil
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to