This is an automated email from the ASF dual-hosted git repository.

yuchenhe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 5278a669 refactor(admin-cli): move disk-balance code into `toolkits` 
package (#958)
5278a669 is described below

commit 5278a669723e1c90e43d104c6207c258cbf15fa5
Author: Jiashuo <[email protected]>
AuthorDate: Fri Apr 29 16:29:12 2022 +0800

    refactor(admin-cli): move disk-balance code into `toolkits` package (#958)
---
 admin-cli/README.md                                |   2 +-
 admin-cli/cmd/disk_balance.go                      |   3 +-
 admin-cli/executor/disk_info.go                    |  28 ++-
 admin-cli/executor/disk_migrate.go                 |  61 +++++
 admin-cli/executor/partition_split.go              |   2 +-
 .../executor/toolkits/diskbalancer/balancer.go     | 107 ++++++++
 .../diskbalancer/helper.go}                        | 272 ++++++---------------
 .../executor/toolkits/{nodesmigrator => }/log.go   |  10 +-
 .../executor/toolkits/nodesmigrator/manager.go     |   9 +-
 .../executor/toolkits/nodesmigrator/migrator.go    |  27 +-
 admin-cli/executor/toolkits/nodesmigrator/nodes.go |  11 +-
 11 files changed, 298 insertions(+), 234 deletions(-)

diff --git a/admin-cli/README.md b/admin-cli/README.md
index f763510a..3db5d255 100644
--- a/admin-cli/README.md
+++ b/admin-cli/README.md
@@ -18,7 +18,7 @@ under the License.
 -->
 # admin-cli
 
-[![Golang Lint and Unit 
Test](https://github.com/apache/incubator-pegasus/actions/workflows/pull-request-go.yml/badge.svg)](https://github.com/apache/incubator-pegasus/actions/workflows/pull-request-go.yml)
+[![Golang Lint and Unit 
Test](https://github.com/apache/incubator-pegasus/actions/workflows/lint_and_test_admin-cli.yml/badge.svg)](https://github.com/apache/incubator-pegasus/actions/workflows/lint_and_test_admin-cli.yml)
 
 The command-line tool for the administration of Pegasus.
 
diff --git a/admin-cli/cmd/disk_balance.go b/admin-cli/cmd/disk_balance.go
index f08f7aed..2a9647ad 100644
--- a/admin-cli/cmd/disk_balance.go
+++ b/admin-cli/cmd/disk_balance.go
@@ -21,6 +21,7 @@ package cmd
 
 import (
        "github.com/apache/incubator-pegasus/admin-cli/executor"
+       
"github.com/apache/incubator-pegasus/admin-cli/executor/toolkits/diskbalancer"
        "github.com/apache/incubator-pegasus/admin-cli/shell"
 
        "github.com/desertbit/grumble"
@@ -60,7 +61,7 @@ func init() {
                        f.Int("i", "interval", 650, "wait disk clean garbage 
replica interval")
                },
                Run: func(c *grumble.Context) error {
-                       return executor.DiskBalance(pegasusClient, 
c.Flags.String("node"), c.Flags.Int64("size"), c.Flags.Int("interval"), 
c.Flags.Bool("auto"))
+                       return diskbalancer.BalanceDiskCapacity(pegasusClient, 
c.Flags.String("node"), c.Flags.Int64("size"), c.Flags.Int("interval"), 
c.Flags.Bool("auto"))
                },
        })
 
diff --git a/admin-cli/executor/disk_info.go b/admin-cli/executor/disk_info.go
index 8f33dd73..24ad8f6a 100644
--- a/admin-cli/executor/disk_info.go
+++ b/admin-cli/executor/disk_info.go
@@ -40,11 +40,11 @@ const (
 
 // QueryDiskInfo command
 func QueryDiskInfo(client *Client, infoType DiskInfoType, replicaServer 
string, tableName string, diskTag string) error {
-       _, err := queryDiskInfo(client, infoType, replicaServer, tableName, 
diskTag, true)
+       _, err := GetDiskInfo(client, infoType, replicaServer, tableName, 
diskTag, true)
        return err
 }
 
-func queryDiskInfo(client *Client, infoType DiskInfoType, replicaServer 
string, tableName string, diskTag string, print bool) ([]interface{}, error) {
+func GetDiskInfo(client *Client, infoType DiskInfoType, replicaServer string, 
tableName string, diskTag string, print bool) ([]interface{}, error) {
        resp, err := sendQueryDiskInfoRequest(client, replicaServer, tableName)
        if err != nil {
                return nil, err
@@ -52,9 +52,9 @@ func queryDiskInfo(client *Client, infoType DiskInfoType, 
replicaServer string,
 
        switch infoType {
        case CapacitySize:
-               return queryDiskCapacity(client, replicaServer, resp, diskTag, 
print), nil
+               return fillDiskCapacity(client, replicaServer, resp, diskTag, 
print), nil
        case ReplicaCount:
-               return queryDiskReplicaCount(client, resp, print), nil
+               return fillDiskReplicaCount(client, resp, print), nil
        default:
                return nil, fmt.Errorf("not support query this disk info: %s", 
infoType)
        }
@@ -110,7 +110,7 @@ type ReplicaCapacityStruct struct {
        Size   int64  `json:"size"`
 }
 
-func queryDiskCapacity(client *Client, replicaServer string, resp 
*radmin.QueryDiskInfoResponse, diskTag string, print bool) []interface{} {
+func fillDiskCapacity(client *Client, replicaServer string, resp 
*radmin.QueryDiskInfoResponse, diskTag string, print bool) []interface{} {
        var diskCapacityInfos []interface{}
        var replicaCapacityInfos []interface{}
 
@@ -155,7 +155,7 @@ func queryDiskCapacity(client *Client, replicaServer 
string, resp *radmin.QueryD
        return diskCapacityInfos
 }
 
-func queryDiskReplicaCount(client *Client, resp *radmin.QueryDiskInfoResponse, 
print bool) []interface{} {
+func fillDiskReplicaCount(client *Client, resp *radmin.QueryDiskInfoResponse, 
print bool) []interface{} {
        type ReplicaCountStruct struct {
                Disk      string `json:"disk"`
                Primary   int    `json:"primary"`
@@ -213,3 +213,19 @@ func AddDisk(client *Client, replicaServer string, diskStr 
string) error {
        fmt.Printf("Node[%s] add new disk succeed\n", replicaServer)
        return nil
 }
+
+func ConvertReplicaCapacityStruct(replicaCapacityInfos []interface{}) 
([]ReplicaCapacityStruct, error) {
+       util.SortStructsByField(replicaCapacityInfos, "Size")
+       var replicas []ReplicaCapacityStruct
+       for _, replica := range replicaCapacityInfos {
+               if r, ok := replica.(ReplicaCapacityStruct); ok {
+                       replicas = append(replicas, r)
+               } else {
+                       return nil, fmt.Errorf("can't covert to 
ReplicaCapacityStruct")
+               }
+       }
+       if replicas == nil {
+               return nil, fmt.Errorf("the disk has no replica")
+       }
+       return replicas, nil
+}
diff --git a/admin-cli/executor/disk_migrate.go 
b/admin-cli/executor/disk_migrate.go
new file mode 100644
index 00000000..4da4b01e
--- /dev/null
+++ b/admin-cli/executor/disk_migrate.go
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package executor
+
+import (
+       "context"
+       "fmt"
+       "time"
+
+       "github.com/XiaoMi/pegasus-go-client/idl/radmin"
+       "github.com/XiaoMi/pegasus-go-client/session"
+       "github.com/apache/incubator-pegasus/admin-cli/util"
+)
+
+func DiskMigrate(client *Client, replicaServer string, pidStr string, from 
string, to string) error {
+       ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
+       defer cancel()
+
+       pid, err := util.Str2Gpid(pidStr)
+       if err != nil {
+               return err
+       }
+
+       node, err := client.Nodes.GetNode(replicaServer, 
session.NodeTypeReplica)
+       if err != nil {
+               return err
+       }
+       replica := node.Replica()
+
+       resp, err := replica.DiskMigrate(ctx, &radmin.ReplicaDiskMigrateRequest{
+               Pid:        pid,
+               OriginDisk: from,
+               TargetDisk: to,
+       })
+
+       if err != nil {
+               if resp != nil && resp.Hint != nil {
+                       return fmt.Errorf("Internal server error [%s:%s]", err, 
*resp.Hint)
+               }
+               return err
+       }
+
+       return nil
+}
diff --git a/admin-cli/executor/partition_split.go 
b/admin-cli/executor/partition_split.go
index 307d13a4..10bab4d9 100644
--- a/admin-cli/executor/partition_split.go
+++ b/admin-cli/executor/partition_split.go
@@ -134,7 +134,7 @@ func DiskBeforeSplit(client *Client, tableName string) 
error {
        for address, resp := range respMap {
                for _, diskInfo := range resp.GetDiskInfos() {
                        diskTag := diskInfo.GetTag()
-                       rCapacityList, err := 
convertReplicaCapacityStruct(queryDiskCapacity(client, address, resp, diskTag, 
false))
+                       rCapacityList, err := 
ConvertReplicaCapacityStruct(fillDiskCapacity(client, address, resp, diskTag, 
false))
                        if err != nil {
                                return fmt.Errorf("%s [hint: failed to get info 
for node(%s) disk(%s) when disk check before split]", err, address, diskTag)
                        }
diff --git a/admin-cli/executor/toolkits/diskbalancer/balancer.go 
b/admin-cli/executor/toolkits/diskbalancer/balancer.go
new file mode 100644
index 00000000..eca177c9
--- /dev/null
+++ b/admin-cli/executor/toolkits/diskbalancer/balancer.go
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package diskbalancer
+
+import (
+       "fmt"
+       "strings"
+       "time"
+
+       "github.com/XiaoMi/pegasus-go-client/idl/base"
+       "github.com/apache/incubator-pegasus/admin-cli/executor"
+       "github.com/apache/incubator-pegasus/admin-cli/executor/toolkits"
+)
+
+var WaitRunning = time.Second * 10  // time for wait migrate complete
+var WaitCleaning = time.Second * 90 // time for wait garbage replica to clean 
complete
+
+var ShortCleanInterval = "1"    // short time wait the disk cleaner clean 
garbage replica
+var LongCleanInterval = "86400" // long time wait the disk cleaner clean 
garbage replica
+
+// auto balance target node disk usage:
+// -1. change the pegasus server disk cleaner internal for clean temp replica 
to free disk space in time
+// -2. get the optimal migrate action to be ready to balance the disk until 
can't migrate base latest disk space stats
+// -3. if current replica is `primary` status, force assign the replica to 
`secondary` status
+// -4. migrate the replica base `getNextMigrateAction` result
+// -5. loop query migrate progress using `DiskMigrate`, it will response 
`ERR_BUSY` if running
+// -6. start next loop until can't allow to balance the node
+// -7. recover disk cleaner internal if balance complete
+// -8. set meta status to `lively` to balance primary and secondary // 
TODO(jiashuo1)
+func BalanceDiskCapacity(client *executor.Client, replicaServer string, 
minSize int64, interval int, auto bool) error {
+       WaitCleaning = time.Second * time.Duration(interval)
+
+       if err := changeDiskCleanerInterval(client, replicaServer, 
ShortCleanInterval); err != nil {
+               return err
+       }
+       defer func() {
+               if err := changeDiskCleanerInterval(client, replicaServer, 
LongCleanInterval); err != nil {
+                       toolkits.LogWarn("revert disk cleaner failed")
+               }
+       }()
+
+       for {
+               action, err := getNextMigrateAction(client, replicaServer, 
minSize)
+               if err != nil {
+                       return err
+               }
+
+               err = executor.DiskMigrate(client, replicaServer, 
action.replica.Gpid, action.from, action.to)
+               if err != nil && action.replica.Status != "secondary" {
+                       err := forceAssignReplicaToSecondary(client, 
replicaServer, action.replica.Gpid)
+                       if err != nil {
+                               return err
+                       }
+                       time.Sleep(WaitRunning)
+                       continue
+               }
+
+               if err != nil {
+                       return fmt.Errorf("migrate(%s) start failed[auto=%v] 
err = %s", action.toString(), auto, err.Error())
+               }
+
+               toolkits.LogInfo(fmt.Sprintf("migrate(%s) has started[auto=%v], 
wait complete...\n", action.toString(), auto))
+               for {
+                       // TODO(jiashuo1): using DiskMigrate RPC to query 
status, consider support queryDiskMigrateStatus RPC
+                       err = executor.DiskMigrate(client, replicaServer, 
action.replica.Gpid, action.from, action.to)
+                       if err == nil {
+                               time.Sleep(WaitRunning)
+                               continue
+                       }
+                       if strings.Contains(err.Error(), 
base.ERR_BUSY.String()) {
+                               toolkits.LogInfo(fmt.Sprintf("migrate(%s) is 
running, msg=%s, wait complete...\n", action.toString(), err.Error()))
+                               time.Sleep(WaitRunning)
+                               continue
+                       }
+                       toolkits.LogInfo(fmt.Sprintf("migrate(%s) is 
completed,result=%s, wait[%ds] disk cleaner remove garbage...\n\n",
+                               action.toString(), err.Error(), interval))
+                       break
+               }
+
+               if auto {
+                       time.Sleep(WaitCleaning)
+                       continue
+               }
+
+               time.Sleep(WaitCleaning)
+               toolkits.LogInfo(fmt.Sprintf("you now disable 
auto-balance[auto=%v], stop and wait manual loop\n\n", auto))
+               break
+       }
+       return nil
+}
diff --git a/admin-cli/executor/disk_balance.go 
b/admin-cli/executor/toolkits/diskbalancer/helper.go
similarity index 56%
rename from admin-cli/executor/disk_balance.go
rename to admin-cli/executor/toolkits/diskbalancer/helper.go
index 82ac17ed..09096e90 100644
--- a/admin-cli/executor/disk_balance.go
+++ b/admin-cli/executor/toolkits/diskbalancer/helper.go
@@ -17,131 +17,23 @@
  * under the License.
  */
 
-package executor
+package diskbalancer
 
 import (
-       "context"
        "fmt"
        "math"
-       "strings"
-       "time"
 
        "github.com/XiaoMi/pegasus-go-client/idl/admin"
-       "github.com/XiaoMi/pegasus-go-client/idl/base"
-       "github.com/XiaoMi/pegasus-go-client/idl/radmin"
        "github.com/XiaoMi/pegasus-go-client/session"
        adminClient "github.com/apache/incubator-pegasus/admin-cli/client"
+       "github.com/apache/incubator-pegasus/admin-cli/executor"
+       "github.com/apache/incubator-pegasus/admin-cli/executor/toolkits"
        "github.com/apache/incubator-pegasus/admin-cli/util"
 )
 
-func DiskMigrate(client *Client, replicaServer string, pidStr string, from 
string, to string) error {
-       ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
-       defer cancel()
-
-       pid, err := util.Str2Gpid(pidStr)
-       if err != nil {
-               return err
-       }
-
-       node, err := client.Nodes.GetNode(replicaServer, 
session.NodeTypeReplica)
-       if err != nil {
-               return err
-       }
-       replica := node.Replica()
-
-       resp, err := replica.DiskMigrate(ctx, &radmin.ReplicaDiskMigrateRequest{
-               Pid:        pid,
-               OriginDisk: from,
-               TargetDisk: to,
-       })
-
-       if err != nil {
-               if resp != nil && resp.Hint != nil {
-                       return fmt.Errorf("Internal server error [%s:%s]", err, 
*resp.Hint)
-               }
-               return err
-       }
-
-       return nil
-}
-
-var WaitRunning = time.Second * 10  // time for wait migrate complete
-var WaitCleaning = time.Second * 90 // time for wait garbage replica to clean 
complete
-
-// auto balance target node disk usage:
-// -1. change the pegasus server disk cleaner internal for clean temp replica 
to free disk space in time
-// -2. get the optimal migrate action to be ready to balance the disk until 
can't migrate base latest disk space stats
-// -3. if current replica is `primary` status, force assign the replica to 
`secondary` status
-// -4. migrate the replica base `getNextMigrateAction` result
-// -5. loop query migrate progress using `DiskMigrate`, it will response 
`ERR_BUSY` if running
-// -6. start next loop until can't allow to balance the node
-// -7. recover disk cleaner internal if balance complete
-// -8. set meta status to `lively` to balance primary and secondary // 
TODO(jiashuo1)
-func DiskBalance(client *Client, replicaServer string, minSize int64, interval 
int, auto bool) error {
-       WaitCleaning = time.Second * time.Duration(interval)
-
-       if err := changeDiskCleanerInterval(client, replicaServer, "1"); err != 
nil {
-               return err
-       }
-       defer func() {
-               if err := changeDiskCleanerInterval(client, replicaServer, 
"86400"); err != nil {
-                       fmt.Println("revert disk cleaner failed")
-               }
-       }()
-
-       for {
-               action, err := getNextMigrateAction(client, replicaServer, 
minSize)
-               if err != nil {
-                       return err
-               }
-
-               err = DiskMigrate(client, replicaServer, action.replica.Gpid, 
action.from, action.to)
-               if err != nil && action.replica.Status != "secondary" {
-                       err := forceAssignReplicaToSecondary(client, 
replicaServer, action.replica.Gpid)
-                       if err != nil {
-                               return err
-                       }
-                       time.Sleep(WaitRunning)
-                       continue
-               }
-
-               if err != nil {
-                       return fmt.Errorf("migrate(%s) start failed[auto=%v] 
err = %s", action.toString(), auto, err.Error())
-               }
-
-               fmt.Printf("migrate(%s) has started[auto=%v], wait 
complete...\n", action.toString(), auto)
-               for {
-                       // TODO(jiashuo1): using DiskMigrate RPC to query 
status, consider support queryDiskMigrateStatus RPC
-                       err = DiskMigrate(client, replicaServer, 
action.replica.Gpid, action.from, action.to)
-                       if err == nil {
-                               time.Sleep(WaitRunning)
-                               continue
-                       }
-                       if strings.Contains(err.Error(), 
base.ERR_BUSY.String()) {
-                               fmt.Printf("migrate(%s) is running, msg=%s, 
wait complete...\n", action.toString(), err.Error())
-                               time.Sleep(WaitRunning)
-                               continue
-                       }
-                       fmt.Printf("migrate(%s) is completed,result=%s, 
wait[%ds] disk cleaner remove garbage...\n\n",
-                               action.toString(), err.Error(), interval)
-                       break
-               }
-
-               if auto {
-                       time.Sleep(WaitCleaning)
-                       continue
-               }
-
-               time.Sleep(WaitCleaning)
-               fmt.Printf("you now disable auto-balance[auto=%v], stop and 
wait manual loop\n\n", auto)
-               break
-       }
-       return nil
-}
-
 type DiskStats struct {
-       diskCapacity    DiskCapacityStruct
-       replicaCapacity []ReplicaCapacityStruct
+       diskCapacity    executor.DiskCapacityStruct
+       replicaCapacity []executor.ReplicaCapacityStruct
 }
 
 type MigrateDisk struct {
@@ -159,7 +51,7 @@ func (m *MigrateDisk) toString() string {
 
 type MigrateAction struct {
        node    string
-       replica ReplicaCapacityStruct
+       replica executor.ReplicaCapacityStruct
        from    string
        to      string
 }
@@ -168,14 +60,14 @@ func (m *MigrateAction) toString() string {
        return fmt.Sprintf("node=%s, replica=%s, %s=>%s", m.node, 
m.replica.Gpid, m.from, m.to)
 }
 
-func changeDiskCleanerInterval(client *Client, replicaServer string, 
cleanInterval string) error {
-       fmt.Printf("set gc_disk_migration_origin_replica_interval_seconds = %ss 
", cleanInterval)
-       err := ConfigCommand(client, session.NodeTypeReplica, replicaServer,
+func changeDiskCleanerInterval(client *executor.Client, replicaServer string, 
cleanInterval string) error {
+       toolkits.LogInfo(fmt.Sprintf("set 
gc_disk_migration_origin_replica_interval_seconds = %ss ", cleanInterval))
+       err := executor.ConfigCommand(client, session.NodeTypeReplica, 
replicaServer,
                "gc_disk_migration_origin_replica_interval_seconds", "set", 
cleanInterval)
        return err
 }
 
-func getNextMigrateAction(client *Client, replicaServer string, minSize int64) 
(*MigrateAction, error) {
+func getNextMigrateAction(client *executor.Client, replicaServer string, 
minSize int64) (*MigrateAction, error) {
        disks, totalUsage, totalCapacity, err := queryDiskCapacityInfo(client, 
replicaServer)
        if err != nil {
                return nil, err
@@ -192,73 +84,17 @@ func getNextMigrateAction(client *Client, replicaServer 
string, minSize int64) (
        return migrateAction, nil
 }
 
-func forceAssignReplicaToSecondary(client *Client, replicaServer string, gpid 
string) error {
-       fmt.Printf("WARNING: the select replica is not secondary, will force 
assign it secondary\n")
-       if _, err := 
client.Meta.MetaControl(admin.MetaFunctionLevel_fl_steady); err != nil {
-               return err
-       }
-       secondaryNode, err := getReplicaSecondaryNode(client, gpid)
-       if err != nil {
-               return err
-       }
-       replica, err := util.Str2Gpid(gpid)
-       if err != nil {
-               return err
-       }
-       return client.Meta.Balance(replica, adminClient.BalanceMovePri,
-               util.NewNodeFromTCPAddr(replicaServer, 
session.NodeTypeReplica), secondaryNode)
-}
-
-func getReplicaSecondaryNode(client *Client, gpid string) (*util.PegasusNode, 
error) {
-       replica, err := util.Str2Gpid(gpid)
-       if err != nil {
-               return nil, err
-       }
-       tables, err := client.Meta.ListApps(admin.AppStatus_AS_AVAILABLE)
-       if err != nil {
-               return nil, fmt.Errorf("can't get the table name of replica %s 
when migrate the replica", gpid)
-       }
-       var tableName string
-       for _, tb := range tables {
-               if tb.AppID == replica.Appid {
-                       tableName = tb.AppName
-                       break
-               }
-       }
-       if tableName == "" {
-               return nil, fmt.Errorf("can't find the table for %s when 
migrate the replica", gpid)
-       }
-
-       resp, err := client.Meta.QueryConfig(tableName)
-       if err != nil {
-               return nil, fmt.Errorf("can't get the table %s configuration 
when migrate the replica(%s): %s",
-                       tableName, gpid, err)
-       }
-
-       var secondaryNode *util.PegasusNode
-       for _, partition := range resp.Partitions {
-               if partition.Pid.String() == replica.String() {
-                       secondaryNode = 
util.NewNodeFromTCPAddr(partition.Secondaries[0].GetAddress(), 
session.NodeTypeReplica)
-               }
-       }
-
-       if secondaryNode == nil {
-               return nil, fmt.Errorf("can't get the replica %s secondary 
node", gpid)
-       }
-       return secondaryNode, nil
-}
-
-func queryDiskCapacityInfo(client *Client, replicaServer string) 
([]DiskCapacityStruct, int64, int64, error) {
-       diskCapacityOnNode, err := queryDiskInfo(client, CapacitySize, 
replicaServer, "", "", false)
+func queryDiskCapacityInfo(client *executor.Client, replicaServer string) 
([]executor.DiskCapacityStruct, int64, int64, error) {
+       diskCapacityOnNode, err := executor.GetDiskInfo(client, 
executor.CapacitySize, replicaServer, "", "", false)
        if err != nil {
                return nil, 0, 0, err
        }
        util.SortStructsByField(diskCapacityOnNode, "Usage")
-       var disks []DiskCapacityStruct
+       var disks []executor.DiskCapacityStruct
        var totalUsage int64
        var totalCapacity int64
        for _, disk := range diskCapacityOnNode {
-               if s, ok := disk.(DiskCapacityStruct); ok {
+               if s, ok := disk.(executor.DiskCapacityStruct); ok {
                        disks = append(disks, s)
                        totalUsage += s.Usage
                        totalCapacity += s.Capacity
@@ -277,15 +113,15 @@ func queryDiskCapacityInfo(client *Client, replicaServer 
string) ([]DiskCapacity
        return disks, totalUsage, totalCapacity, nil
 }
 
-func getMigrateDiskInfo(client *Client, replicaServer string, disks 
[]DiskCapacityStruct,
+func getMigrateDiskInfo(client *executor.Client, replicaServer string, disks 
[]executor.DiskCapacityStruct,
        totalUsage int64, totalCapacity int64) (*MigrateDisk, error) {
        highUsageDisk := disks[len(disks)-1]
-       highDiskInfo, err := queryDiskInfo(client, CapacitySize, replicaServer, 
"", highUsageDisk.Disk, false)
+       highDiskInfo, err := executor.GetDiskInfo(client, 
executor.CapacitySize, replicaServer, "", highUsageDisk.Disk, false)
        if err != nil {
                return nil, err
        }
        lowUsageDisk := disks[0]
-       lowDiskInfo, err := queryDiskInfo(client, CapacitySize, replicaServer, 
"", lowUsageDisk.Disk, false)
+       lowDiskInfo, err := executor.GetDiskInfo(client, executor.CapacitySize, 
replicaServer, "", lowUsageDisk.Disk, false)
        if err != nil {
                return nil, err
        }
@@ -305,13 +141,13 @@ func getMigrateDiskInfo(client *Client, replicaServer 
string, disks []DiskCapaci
                        lowUsageDisk.Usage, lowUsageDisk.Ratio, averageUsage, 
averageRatio)
        }
 
-       replicaCapacityOnHighDisk, err := 
convertReplicaCapacityStruct(highDiskInfo)
-       if err != nil { // we need migrate replica from high disk, so the 
convert must be successful
+       replicaCapacityOnHighDisk, err := 
executor.ConvertReplicaCapacityStruct(highDiskInfo)
+       if err != nil {
                return nil, fmt.Errorf("parse replica info on high disk(%s) 
failed: %s", highUsageDisk.Disk, err.Error())
        }
-       replicaCapacityOnLowDisk, err := 
convertReplicaCapacityStruct(lowDiskInfo)
-       if err != nil { // we don't care the replica capacity info on low disk, 
if convert failed, we only log warning and know the result=nil
-               fmt.Printf("WARNING: parse replica info on low disk(%s) failed: 
%s", lowUsageDisk.Disk, err.Error())
+       replicaCapacityOnLowDisk, err := 
executor.ConvertReplicaCapacityStruct(lowDiskInfo)
+       if err != nil {
+               return nil, fmt.Errorf("parse replica info on low disk(%s) 
failed: %s", highUsageDisk.Disk, err.Error())
        }
        return &MigrateDisk{
                averageUsage: averageUsage,
@@ -332,7 +168,7 @@ func computeMigrateAction(migrate *MigrateDisk, minSize 
int64) (*MigrateAction,
        highDiskCanSendMax := migrate.highDisk.diskCapacity.Usage - 
migrate.averageUsage
        sizeNeedMove := int64(math.Min(float64(lowDiskCanReceiveMax), 
float64(highDiskCanSendMax)))
 
-       var selectReplica *ReplicaCapacityStruct
+       var selectReplica *executor.ReplicaCapacityStruct
        for i := len(migrate.highDisk.replicaCapacity) - 1; i >= 0; i-- {
                if migrate.highDisk.replicaCapacity[i].Size > sizeNeedMove {
                        continue
@@ -367,18 +203,58 @@ func computeMigrateAction(migrate *MigrateDisk, minSize 
int64) (*MigrateAction,
        }, nil
 }
 
-func convertReplicaCapacityStruct(replicaCapacityInfos []interface{}) 
([]ReplicaCapacityStruct, error) {
-       util.SortStructsByField(replicaCapacityInfos, "Size")
-       var replicas []ReplicaCapacityStruct
-       for _, replica := range replicaCapacityInfos {
-               if r, ok := replica.(ReplicaCapacityStruct); ok {
-                       replicas = append(replicas, r)
-               } else {
-                       return nil, fmt.Errorf("can't covert to 
ReplicaCapacityStruct")
+func forceAssignReplicaToSecondary(client *executor.Client, replicaServer 
string, gpid string) error {
+       fmt.Printf("WARNING: the select replica is not secondary, will force 
assign it secondary\n")
+       if _, err := 
client.Meta.MetaControl(admin.MetaFunctionLevel_fl_steady); err != nil {
+               return err
+       }
+       secondaryNode, err := getReplicaSecondaryNode(client, gpid)
+       if err != nil {
+               return err
+       }
+       replica, err := util.Str2Gpid(gpid)
+       if err != nil {
+               return err
+       }
+       return client.Meta.Balance(replica, adminClient.BalanceMovePri,
+               util.NewNodeFromTCPAddr(replicaServer, 
session.NodeTypeReplica), secondaryNode)
+}
+
+func getReplicaSecondaryNode(client *executor.Client, gpid string) 
(*util.PegasusNode, error) {
+       replica, err := util.Str2Gpid(gpid)
+       if err != nil {
+               return nil, err
+       }
+       tables, err := client.Meta.ListApps(admin.AppStatus_AS_AVAILABLE)
+       if err != nil {
+               return nil, fmt.Errorf("can't get the table name of replica %s 
when migrate the replica", gpid)
+       }
+       var tableName string
+       for _, tb := range tables {
+               if tb.AppID == replica.Appid {
+                       tableName = tb.AppName
+                       break
+               }
+       }
+       if tableName == "" {
+               return nil, fmt.Errorf("can't find the table for %s when 
migrate the replica", gpid)
+       }
+
+       resp, err := client.Meta.QueryConfig(tableName)
+       if err != nil {
+               return nil, fmt.Errorf("can't get the table %s configuration 
when migrate the replica(%s): %s",
+                       tableName, gpid, err)
+       }
+
+       var secondaryNode *util.PegasusNode
+       for _, partition := range resp.Partitions {
+               if partition.Pid.String() == replica.String() {
+                       secondaryNode = 
util.NewNodeFromTCPAddr(partition.Secondaries[0].GetAddress(), 
session.NodeTypeReplica)
                }
        }
-       if replicas == nil {
-               return nil, fmt.Errorf("the disk has no replica")
+
+       if secondaryNode == nil {
+               return nil, fmt.Errorf("can't get the replica %s secondary 
node", gpid)
        }
-       return replicas, nil
+       return secondaryNode, nil
 }
diff --git a/admin-cli/executor/toolkits/nodesmigrator/log.go 
b/admin-cli/executor/toolkits/log.go
similarity index 88%
rename from admin-cli/executor/toolkits/nodesmigrator/log.go
rename to admin-cli/executor/toolkits/log.go
index 71fabd75..4ac2d85a 100644
--- a/admin-cli/executor/toolkits/nodesmigrator/log.go
+++ b/admin-cli/executor/toolkits/log.go
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package nodesmigrator
+package toolkits
 
 import (
        "fmt"
@@ -25,21 +25,21 @@ import (
        "github.com/sirupsen/logrus"
 )
 
-func logInfo(log string) {
+func LogInfo(log string) {
        fmt.Printf("INFO: %s\n", log)
        logrus.Info(log)
 }
 
-func logWarn(log string) {
+func LogWarn(log string) {
        fmt.Printf("WARN: %s\n", log)
        logrus.Warn(log)
 }
 
-func logDebug(log string) {
+func LogDebug(log string) {
        logrus.Debug(log)
 }
 
-func logPanic(log string) {
+func LogPanic(log string) {
        fmt.Println(log)
        logrus.Panic(log)
 }
diff --git a/admin-cli/executor/toolkits/nodesmigrator/manager.go 
b/admin-cli/executor/toolkits/nodesmigrator/manager.go
index 6a5c9088..84f8934f 100644
--- a/admin-cli/executor/toolkits/nodesmigrator/manager.go
+++ b/admin-cli/executor/toolkits/nodesmigrator/manager.go
@@ -28,6 +28,7 @@ import (
 
        "github.com/XiaoMi/pegasus-go-client/idl/admin"
        "github.com/apache/incubator-pegasus/admin-cli/executor"
+       "github.com/apache/incubator-pegasus/admin-cli/executor/toolkits"
        "github.com/apache/incubator-pegasus/admin-cli/util"
 )
 
@@ -39,11 +40,11 @@ func MigrateAllReplicaToNodes(client *executor.Client, from 
[]string, to []strin
        GlobalBatchTable = batchTable
        GlobalBatchTarget = batchTarget
        if len(to) != targetCount {
-               logPanic(fmt.Sprintf("please make sure the targets count == 
`--final_target` value: %d vs %d", len(to),
+               toolkits.LogPanic(fmt.Sprintf("please make sure the targets 
count == `--final_target` value: %d vs %d", len(to),
                        targetCount))
        }
 
-       logWarn(fmt.Sprintf("you now migrate to target count assign to be %d in 
final, "+
+       toolkits.LogWarn(fmt.Sprintf("you now migrate to target count assign to 
be %d in final, "+
                "please make sure it is ok! sleep 10s and then start", 
targetCount))
        time.Sleep(time.Second * 10)
 
@@ -75,14 +76,14 @@ func MigrateAllReplicaToNodes(client *executor.Client, from 
[]string, to []strin
        balanceFactor := 0
        for {
                if totalRemainingReplica <= 0 {
-                       logInfo("\n\n==============completed for all origin 
nodes has been migrated================")
+                       toolkits.LogInfo("\n\n==============completed for all 
origin nodes has been migrated================")
                        return executor.ListNodes(client)
                }
 
                if currentOriginNode.String() == firstOrigin.String() {
                        originRound++
                }
-               logInfo(fmt.Sprintf("\n\n*******************[%d|%s]start 
migrate replicas, remainingReplica=%d*****************",
+               
toolkits.LogInfo(fmt.Sprintf("\n\n*******************[%d|%s]start migrate 
replicas, remainingReplica=%d*****************",
                        balanceFactor, currentOriginNode.String(), 
totalRemainingReplica))
 
                currentOriginNode.downgradeAllReplicaToSecondary(client)
diff --git a/admin-cli/executor/toolkits/nodesmigrator/migrator.go 
b/admin-cli/executor/toolkits/nodesmigrator/migrator.go
index 61304d07..6899b738 100644
--- a/admin-cli/executor/toolkits/nodesmigrator/migrator.go
+++ b/admin-cli/executor/toolkits/nodesmigrator/migrator.go
@@ -32,6 +32,7 @@ import (
        "github.com/XiaoMi/pegasus-go-client/session"
        migrator "github.com/apache/incubator-pegasus/admin-cli/client"
        "github.com/apache/incubator-pegasus/admin-cli/executor"
+       "github.com/apache/incubator-pegasus/admin-cli/executor/toolkits"
        "github.com/apache/incubator-pegasus/admin-cli/util"
 )
 
@@ -52,7 +53,7 @@ func (m *Migrator) run(client *executor.Client, table string, 
balanceFactor int,
        for {
                target := m.selectNextTargetNode(targets)
                if target.String() == origin.String() {
-                       logInfo(fmt.Sprintf("completed for origin and target is 
same: %s", origin.String()))
+                       toolkits.LogInfo(fmt.Sprintf("completed for origin and 
target is same: %s", origin.String()))
                        return m.getTotalRemainingReplicaCount()
                }
 
@@ -60,7 +61,7 @@ func (m *Migrator) run(client *executor.Client, table string, 
balanceFactor int,
                m.updateOngoingActionList()
                remainingCount := m.getRemainingReplicaCount(origin)
                if remainingCount <= 0 || 
len(balanceTargets)+len(invalidTargets) >= len(targets) {
-                       logInfo(fmt.Sprintf("[%s]completed(remaining=%d, 
balance=%d, invalid=%d, running_target=%d, final_target=%d) for no replicas can 
be migrated",
+                       
toolkits.LogInfo(fmt.Sprintf("[%s]completed(remaining=%d, balance=%d, 
invalid=%d, running_target=%d, final_target=%d) for no replicas can be 
migrated",
                                table, remainingCount, len(balanceTargets), 
len(invalidTargets), len(targets), len(m.targets)))
                        return m.getTotalRemainingReplicaCount()
                }
@@ -69,7 +70,7 @@ func (m *Migrator) run(client *executor.Client, table string, 
balanceFactor int,
                currentCount := m.getCurrentReplicaCount(target)
                if currentCount >= expectCount {
                        balanceTargets[target.String()] = 1
-                       logDebug(fmt.Sprintf("[%s]balance: no need migrate 
replicas to %s, current=%d, expect=max(%d), total_balance=%d",
+                       toolkits.LogDebug(fmt.Sprintf("[%s]balance: no need 
migrate replicas to %s, current=%d, expect=max(%d), total_balance=%d",
                                table, target.String(), currentCount, 
expectCount, len(balanceTargets)))
                        if len(m.ongoingActions.actionList) > 0 {
                                time.Sleep(10 * time.Second)
@@ -79,7 +80,7 @@ func (m *Migrator) run(client *executor.Client, table string, 
balanceFactor int,
 
                if !m.existValidReplica(origin, target) {
                        invalidTargets[target.String()] = 1
-                       logDebug(fmt.Sprintf("[%s]invalid: no invalid migrate 
replicas to %s, total_invalid=%d",
+                       toolkits.LogDebug(fmt.Sprintf("[%s]invalid: no invalid 
migrate replicas to %s, total_invalid=%d",
                                table, target.String(), len(invalidTargets)))
                        if len(m.ongoingActions.actionList) > 0 {
                                time.Sleep(10 * time.Second)
@@ -89,7 +90,7 @@ func (m *Migrator) run(client *executor.Client, table string, 
balanceFactor int,
 
                currentConcurrentCount := target.concurrent(m.ongoingActions)
                if currentConcurrentCount == maxConcurrent {
-                       logDebug(fmt.Sprintf("[%s] %s has excceed the max 
concurrent = %d", table, target.String(),
+                       toolkits.LogDebug(fmt.Sprintf("[%s] %s has excceed the 
max concurrent = %d", table, target.String(),
                                currentConcurrentCount))
                        time.Sleep(10 * time.Second)
                        continue
@@ -118,7 +119,7 @@ func (m *Migrator) selectNextTargetNode(targets 
[]*util.PegasusNode) *MigratorNo
 func (m *Migrator) updateNodesReplicaInfo(client *executor.Client, table 
string) {
        for {
                if err := m.syncNodesReplicaInfo(client, table); err != nil {
-                       logDebug(fmt.Sprintf("[%s]table may be unhealthy: %s", 
table, err.Error()))
+                       toolkits.LogDebug(fmt.Sprintf("[%s]table may be 
unhealthy: %s", table, err.Error()))
                        time.Sleep(10 * time.Second)
                        continue
                }
@@ -240,7 +241,7 @@ func (m *Migrator) sendMigrateRequest(client 
*executor.Client, table string, ori
        from := m.nodes[origin.String()]
        to := m.nodes[target.String()]
        if len(from.replicas) == 0 {
-               logDebug(fmt.Sprintf("the node[%s] has no replica to migrate", 
target.node.String()))
+               toolkits.LogDebug(fmt.Sprintf("the node[%s] has no replica to 
migrate", target.node.String()))
                return
        }
 
@@ -255,12 +256,12 @@ func (m *Migrator) sendMigrateRequest(client 
*executor.Client, table string, ori
                }
 
                if to.contain(replica.gpid) {
-                       logDebug(fmt.Sprintf("actions[%s] target has existed 
the replica", action.toString()))
+                       toolkits.LogDebug(fmt.Sprintf("actions[%s] target has 
existed the replica", action.toString()))
                        continue
                }
 
                if m.totalActions.exist(action) {
-                       logDebug(fmt.Sprintf("action[%s] has assgin other 
task", action.toString()))
+                       toolkits.LogDebug(fmt.Sprintf("action[%s] has assgin 
other task", action.toString()))
                        continue
                }
 
@@ -270,10 +271,10 @@ func (m *Migrator) sendMigrateRequest(client 
*executor.Client, table string, ori
                if err != nil {
                        m.totalActions.delete(action)
                        m.ongoingActions.delete(action)
-                       logWarn(fmt.Sprintf("send failed: %s", err.Error()))
+                       toolkits.LogWarn(fmt.Sprintf("send failed: %s", 
err.Error()))
                        continue
                }
-               logInfo(fmt.Sprintf("[%s]send %s success, ongiong task = %d", 
table, action.toString(), target.concurrent(m.ongoingActions)))
+               toolkits.LogInfo(fmt.Sprintf("[%s]send %s success, ongiong task 
= %d", table, action.toString(), target.concurrent(m.ongoingActions)))
                return
        }
 }
@@ -290,10 +291,10 @@ func (m *Migrator) updateOngoingActionList() {
        for name, act := range m.ongoingActions.actionList {
                node := m.nodes[act.to.String()]
                if node.contain(act.replica.gpid) {
-                       logInfo(fmt.Sprintf("%s has completed", name))
+                       toolkits.LogInfo(fmt.Sprintf("%s has completed", name))
                        m.ongoingActions.delete(act)
                } else {
-                       logInfo(fmt.Sprintf("%s is running", name))
+                       toolkits.LogInfo(fmt.Sprintf("%s is running", name))
                }
        }
 }
diff --git a/admin-cli/executor/toolkits/nodesmigrator/nodes.go 
b/admin-cli/executor/toolkits/nodesmigrator/nodes.go
index 43d2bb3b..dc56b059 100644
--- a/admin-cli/executor/toolkits/nodesmigrator/nodes.go
+++ b/admin-cli/executor/toolkits/nodesmigrator/nodes.go
@@ -26,6 +26,7 @@ import (
        "github.com/XiaoMi/pegasus-go-client/idl/base"
        migrator "github.com/apache/incubator-pegasus/admin-cli/client"
        "github.com/apache/incubator-pegasus/admin-cli/executor"
+       "github.com/apache/incubator-pegasus/admin-cli/executor/toolkits"
        "github.com/apache/incubator-pegasus/admin-cli/util"
 )
 
@@ -57,7 +58,7 @@ func (m *MigratorNode) downgradeAllReplicaToSecondary(client 
*executor.Client) {
        for {
                err := migrator.MigratePrimariesOut(client.Meta, m.node)
                if err != nil {
-                       logDebug(fmt.Sprintf("migrate primary out of %s is 
invalid now, err = %s\n", m.String(), err))
+                       toolkits.LogDebug(fmt.Sprintf("migrate primary out of 
%s is invalid now, err = %s\n", m.String(), err))
                        time.Sleep(10 * time.Second)
                        continue
                }
@@ -75,23 +76,23 @@ func (m *MigratorNode) 
downgradeAllReplicaToSecondary(client *executor.Client) {
 func (m *MigratorNode) checkIfNoPrimary(client *executor.Client) bool {
        tables, err := client.Meta.ListAvailableApps()
        if err != nil {
-               logDebug(fmt.Sprintf("migrate primary out of %s is invalid when 
list app, err = %s", m.String(), err))
+               toolkits.LogDebug(fmt.Sprintf("migrate primary out of %s is 
invalid when list app, err = %s", m.String(), err))
                return false
        }
 
        for _, tb := range tables {
                partitions, err := migrator.ListPrimariesOnNode(client.Meta, 
m.node, tb.AppName)
                if err != nil {
-                       logDebug(fmt.Sprintf("migrate primary out of %s is 
invalid when list primaries, err = %s", m.String(), err))
+                       toolkits.LogDebug(fmt.Sprintf("migrate primary out of 
%s is invalid when list primaries, err = %s", m.String(), err))
                        return false
                }
                if len(partitions) > 0 {
-                       logDebug(fmt.Sprintf("migrate primary out of %s is not 
completed, current count = %d", m.String(), len(partitions)))
+                       toolkits.LogDebug(fmt.Sprintf("migrate primary out of 
%s is not completed, current count = %d", m.String(), len(partitions)))
                        return false
                }
        }
 
-       logInfo(fmt.Sprintf("migrate primary out of %s successfully", 
m.String()))
+       toolkits.LogInfo(fmt.Sprintf("migrate primary out of %s successfully", 
m.String()))
        return true
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to