This is an automated email from the ASF dual-hosted git repository.

yuchenhe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new dbf096f7 feat(admin-cli): support nodes capacity balance using 
admin-cli (#969)
dbf096f7 is described below

commit dbf096f755d693ae97344c87c5a9424fca152bb6
Author: Jiashuo <[email protected]>
AuthorDate: Wed Jun 1 10:25:58 2022 +0800

    feat(admin-cli): support nodes capacity balance using admin-cli (#969)
---
 admin-cli/cmd/nodes_balancer.go                    |  42 ++++
 admin-cli/executor/disk_info.go                    |   2 +-
 .../diskbalancer/{helper.go => migrator.go}        |   4 +-
 .../executor/toolkits/nodesbalancer/balancer.go    | 132 +++++++++++
 .../executor/toolkits/nodesbalancer/migrator.go    | 253 +++++++++++++++++++++
 5 files changed, 430 insertions(+), 3 deletions(-)

diff --git a/admin-cli/cmd/nodes_balancer.go b/admin-cli/cmd/nodes_balancer.go
new file mode 100644
index 00000000..7188a66f
--- /dev/null
+++ b/admin-cli/cmd/nodes_balancer.go
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package cmd
+
+import (
+       
"github.com/apache/incubator-pegasus/admin-cli/executor/toolkits/nodesbalancer"
+       "github.com/apache/incubator-pegasus/admin-cli/shell"
+       "github.com/desertbit/grumble"
+)
+
+func init() {
+       shell.AddCommand(&grumble.Command{
+               Name: "nodes-balancer",
+               Help: "migrate replica among the replica server to balance the 
capacity of cluster, please " +
+                       "make sure the server config is right, detail see 
https://github.com/apache/incubator-pegasus/pull/969";,
+               Flags: func(a *grumble.Flags) {
+                       a.BoolL("auto", false, "whether to migrate replica 
until all nodes is balanced, false "+
+                               "by default, which means it just migrate one 
replica")
+               },
+               Run: func(c *grumble.Context) error {
+                       auto := c.Flags.Bool("auto")
+                       return nodesbalancer.BalanceNodeCapacity(pegasusClient, 
auto)
+               },
+       })
+}
diff --git a/admin-cli/executor/disk_info.go b/admin-cli/executor/disk_info.go
index 45fe3876..a7793bb4 100644
--- a/admin-cli/executor/disk_info.go
+++ b/admin-cli/executor/disk_info.go
@@ -225,7 +225,7 @@ func ConvertReplicaCapacityStruct(replicaCapacityInfos 
[]interface{}) ([]Replica
                }
        }
        if replicas == nil {
-               return nil, fmt.Errorf("the disk has no replica")
+               return []ReplicaCapacityStruct{}, nil
        }
        return replicas, nil
 }
diff --git a/admin-cli/executor/toolkits/diskbalancer/helper.go 
b/admin-cli/executor/toolkits/diskbalancer/migrator.go
similarity index 98%
rename from admin-cli/executor/toolkits/diskbalancer/helper.go
rename to admin-cli/executor/toolkits/diskbalancer/migrator.go
index 55ba7494..ead7f4fa 100644
--- a/admin-cli/executor/toolkits/diskbalancer/helper.go
+++ b/admin-cli/executor/toolkits/diskbalancer/migrator.go
@@ -68,7 +68,7 @@ func changeDiskCleanerInterval(client *executor.Client, 
replicaServer string, cl
 }
 
 func getNextMigrateAction(client *executor.Client, replicaServer string, 
minSize int64) (*MigrateAction, error) {
-       disks, totalUsage, totalCapacity, err := queryDiskCapacityInfo(client, 
replicaServer)
+       disks, totalUsage, totalCapacity, err := QueryDiskCapacityInfo(client, 
replicaServer)
        if err != nil {
                return nil, err
        }
@@ -84,7 +84,7 @@ func getNextMigrateAction(client *executor.Client, 
replicaServer string, minSize
        return migrateAction, nil
 }
 
-func queryDiskCapacityInfo(client *executor.Client, replicaServer string) 
([]executor.DiskCapacityStruct, int64, int64, error) {
+func QueryDiskCapacityInfo(client *executor.Client, replicaServer string) 
([]executor.DiskCapacityStruct, int64, int64, error) {
        diskCapacityOnNode, err := executor.GetDiskInfo(client, 
executor.CapacitySize, replicaServer, "", "", false)
        if err != nil {
                return nil, 0, 0, err
diff --git a/admin-cli/executor/toolkits/nodesbalancer/balancer.go 
b/admin-cli/executor/toolkits/nodesbalancer/balancer.go
new file mode 100644
index 00000000..207874c4
--- /dev/null
+++ b/admin-cli/executor/toolkits/nodesbalancer/balancer.go
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package nodesbalancer
+
+import (
+       "fmt"
+       "time"
+
+       "github.com/apache/incubator-pegasus/admin-cli/executor"
+       "github.com/apache/incubator-pegasus/admin-cli/executor/toolkits"
+       "github.com/apache/incubator-pegasus/go-client/session"
+)
+
+// By default, the node capacity of the server needs to be updated every 10 
minutes.
+// Therefore, after a partition is migrated completed, the tool cannot 
immediately
+// obtain the latest capacity distribution. Please adjust the node capacity 
update
+// interval of the server to speed up the equalization speed. Relevant 
configurations
+// are as follows:
+//
+//- disk_stat_interval_seconds = 600
+//+ disk_stat_interval_seconds = 60 # or less
+//
+//- gc_memory_replica_interval_ms = 600000
+//+ gc_memory_replica_interval_ms = 60000 # or less
+
+func BalanceNodeCapacity(client *executor.Client, auto bool) error {
+       err := initClusterEnv(client)
+       if err != nil {
+               return err
+       }
+
+       balancer := &Migrator{}
+       for {
+               err := balancer.updateNodesLoad(client)
+               if err != nil {
+                       toolkits.LogInfo(fmt.Sprintf("retry update load, err = 
%s", err.Error()))
+                       time.Sleep(time.Second * 10)
+                       continue
+               }
+
+               action, err := balancer.selectNextAction(client)
+               if err != nil {
+                       return err
+               }
+
+               err = client.Meta.Balance(action.replica.Gpid, 
action.replica.Status, action.from.Node, action.to.Node)
+               if err != nil {
+                       return fmt.Errorf("migrate action[%s] now is invalid: 
%s", action.toString(), err.Error())
+               }
+               err = waitCompleted(client, action)
+               if err != nil {
+                       return fmt.Errorf("wait replica migrate err: %s", 
err.Error())
+               }
+               if !auto {
+                       break
+               }
+               time.Sleep(time.Second * 10)
+       }
+       err = resetClusterEnv(client)
+       if err != nil {
+               return err
+       }
+       return nil
+}
+
+func initClusterEnv(client *executor.Client) error {
+       toolkits.LogWarn("This cluster will be balanced based capacity, please 
don't open count-balance in later")
+       time.Sleep(time.Second * 3)
+
+       // set meta level as steady
+       err := executor.SetMetaLevel(client, "steady")
+       if err != nil {
+               return err
+       }
+       // disable migrate replica base `lively`
+       toolkits.LogInfo("set meta.lb.only_move_primary true")
+       err = executor.RemoteCommand(client, session.NodeTypeMeta, "", 
"meta.lb.only_move_primary", []string{"true"})
+       if err != nil {
+               return err
+       }
+       toolkits.LogInfo("set meta.lb.only_primary_balancer true")
+       err = executor.RemoteCommand(client, session.NodeTypeMeta, "", 
"meta.lb.only_primary_balancer", []string{"true"})
+       if err != nil {
+               return err
+       }
+       // reset garbage replica clear interval
+       toolkits.LogInfo("set gc_disk_error_replica_interval_seconds 10")
+       err = executor.ConfigCommand(client, session.NodeTypeReplica, "", 
"gc_disk_error_replica_interval_seconds", "set", "10")
+       if err != nil {
+               return err
+       }
+       toolkits.LogInfo("set gc_disk_garbage_replica_interval_seconds 10")
+       err = executor.ConfigCommand(client, session.NodeTypeReplica, "", 
"gc_disk_garbage_replica_interval_seconds", "set", "10")
+       if err != nil {
+               return err
+       }
+       return nil
+}
+
+func resetClusterEnv(client *executor.Client) error {
+       toolkits.LogWarn("This cluster garbage interval will be reset default")
+
+       // reset garbage replica clear interval
+       toolkits.LogInfo("set gc_disk_error_replica_interval_seconds 3600")
+       err := executor.ConfigCommand(client, session.NodeTypeReplica, "", 
"gc_disk_error_replica_interval_seconds", "set", "3600")
+       if err != nil {
+               return err
+       }
+       toolkits.LogInfo("set gc_disk_garbage_replica_interval_seconds 3600")
+       err = executor.ConfigCommand(client, session.NodeTypeReplica, "", 
"gc_disk_garbage_replica_interval_seconds", "set", "3600")
+       if err != nil {
+               return err
+       }
+       return nil
+}
diff --git a/admin-cli/executor/toolkits/nodesbalancer/migrator.go 
b/admin-cli/executor/toolkits/nodesbalancer/migrator.go
new file mode 100644
index 00000000..1024de69
--- /dev/null
+++ b/admin-cli/executor/toolkits/nodesbalancer/migrator.go
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package nodesbalancer
+
+import (
+       "fmt"
+       "math"
+       "time"
+
+       migrator "github.com/apache/incubator-pegasus/admin-cli/client"
+       "github.com/apache/incubator-pegasus/admin-cli/executor"
+       "github.com/apache/incubator-pegasus/admin-cli/executor/toolkits"
+       
"github.com/apache/incubator-pegasus/admin-cli/executor/toolkits/diskbalancer"
+       "github.com/apache/incubator-pegasus/admin-cli/util"
+       "github.com/apache/incubator-pegasus/go-client/idl/base"
+)
+
+type NodesCapacity struct {
+       Node      *util.PegasusNode `json:"node"`
+       Disks     []executor.DiskCapacityStruct
+       Total     int64 `json:"total"`
+       Usage     int64 `json:"usage"`
+       Available int64 `json:"available"`
+}
+
+type NodesReplica struct {
+       Node     *util.PegasusNode
+       Replicas []*executor.ReplicaCapacityStruct
+}
+
+type Migrator struct {
+       CapacityLoad []NodesCapacity
+       Total        int64
+       Usage        int64
+       Average      int64
+}
+
+func (m *Migrator) reset() {
+       m.Total = 0
+       m.Average = 0
+       m.Usage = 0
+}
+
+func (m *Migrator) updateNodesLoad(client *executor.Client) error {
+       nodes, err := client.Meta.ListNodes()
+       if err != nil {
+               return err
+       }
+
+       var nodesLoad []interface{}
+       for _, node := range nodes {
+               pegasusNode := 
client.Nodes.MustGetReplica(node.Address.GetAddress())
+               disksLoad, totalUsage, totalCapacity, err := 
diskbalancer.QueryDiskCapacityInfo(client, pegasusNode.TCPAddr())
+               if err != nil {
+                       return err
+               }
+               diskCapacity := NodesCapacity{
+                       Node:      pegasusNode,
+                       Disks:     disksLoad,
+                       Total:     totalCapacity,
+                       Usage:     totalUsage,
+                       Available: totalCapacity - totalUsage,
+               }
+               nodesLoad = append(nodesLoad, diskCapacity)
+       }
+       if nodesLoad == nil {
+               return err
+       }
+
+       m.reset()
+       util.SortStructsByField(nodesLoad, "Usage")
+       for _, node := range nodesLoad {
+               m.CapacityLoad = append(m.CapacityLoad, node.(NodesCapacity))
+               m.Total += node.(NodesCapacity).Total
+               m.Usage += node.(NodesCapacity).Usage
+       }
+       m.Average = m.Usage / int64(len(nodesLoad))
+       return nil
+}
+
+type partition struct {
+       Gpid   *base.Gpid
+       Status migrator.BalanceType
+       Size   int64
+}
+
+type ActionProposal struct {
+       replica *partition
+       from    *NodesCapacity
+       to      *NodesCapacity
+}
+
+func (act *ActionProposal) toString() string {
+       return fmt.Sprintf("[%s]%s:%s=>%s", act.replica.Status.String(), 
act.replica.Gpid.String(),
+               act.from.Node.String(), act.to.Node.String())
+}
+
+func (m *Migrator) selectNextAction(client *executor.Client) (*ActionProposal, 
error) {
+       highNode := m.CapacityLoad[len(m.CapacityLoad)-1]
+       lowNode := m.CapacityLoad[0]
+
+       highDiskOfHighNode := highNode.Disks[len(highNode.Disks)-1]
+       toolkits.LogInfo(fmt.Sprintf("expect_average = %dGB, high node = 
%s[%s][usage=%dGB], low node = %s[usage=%dGB]\n",
+               m.Average/1024, highNode.Node.String(), 
highDiskOfHighNode.Disk, highNode.Usage/1024, lowNode.Node.String(), 
lowNode.Usage/1024))
+
+       lowUsageRatio := lowNode.Usage * 100 / lowNode.Total
+       highUsageRatio := highNode.Usage * 100 / highNode.Total
+
+       if highUsageRatio-lowUsageRatio <= 5 {
+               return nil, fmt.Errorf("high node and low node has little diff: 
%d vs %d", highUsageRatio, lowUsageRatio)
+       }
+
+       sizeAllowMoved := math.Min(float64(highNode.Usage-m.Average), 
float64(m.Average-lowNode.Usage))
+       highDiskReplicasOfHighNode, err := getDiskReplicas(client, &highNode, 
highDiskOfHighNode.Disk)
+       if err != nil {
+               return nil, fmt.Errorf("get high node[%s] high disk[%s] 
replicas err: %s", highNode.Node.String(), highDiskOfHighNode.Disk, err.Error())
+       }
+
+       totalReplicasOfLowNode, err := getNodeReplicas(client, &lowNode)
+       if err != nil {
+               return nil, fmt.Errorf("get low node[%s] replicas err: %s", 
lowNode.Node.String(), err.Error())
+       }
+
+       var selectReplica executor.ReplicaCapacityStruct
+       for _, replica := range highDiskReplicasOfHighNode {
+               if replica.Size > int64(sizeAllowMoved) {
+                       toolkits.LogDebug(fmt.Sprintf("select next replica for 
the replica is too large(replica_size > allow_size): %d > %f", replica.Size, 
sizeAllowMoved))
+                       continue
+               }
+
+               if totalReplicasOfLowNode.contain(replica.Gpid) {
+                       toolkits.LogDebug(fmt.Sprintf("select next replica for 
the replica(%s) is has existed target node(%s)", replica.Gpid, 
lowNode.Node.String()))
+                       continue
+               }
+
+               selectReplica = replica
+       }
+
+       if selectReplica.Gpid == "" {
+               return nil, fmt.Errorf("can't find valid replica to balance")
+       }
+
+       gpid, err := util.Str2Gpid(selectReplica.Gpid)
+       if err != nil {
+               return nil, err
+       }
+
+       status := migrator.BalanceCopySec
+       if selectReplica.Status == "primary" {
+               status = migrator.BalanceCopyPri
+       }
+       return &ActionProposal{
+               replica: &partition{
+                       Gpid:   gpid,
+                       Status: status,
+               },
+               from: &highNode,
+               to:   &lowNode,
+       }, err
+}
+
+type replicas []executor.ReplicaCapacityStruct
+
+func (r replicas) contain(selectReplica string) bool {
+       for _, replica := range r {
+               if replica.Gpid == selectReplica {
+                       return true
+               }
+       }
+       return false
+}
+
+func getDiskReplicas(client *executor.Client, replicaServer *NodesCapacity, 
diskTag string) (replicas, error) {
+       node := replicaServer.Node.TCPAddr()
+       diskInfo, err := executor.GetDiskInfo(client, executor.CapacitySize, 
node, "", diskTag, false)
+       if err != nil {
+               return nil, err
+       }
+       replicas, err := executor.ConvertReplicaCapacityStruct(diskInfo)
+       if err != nil {
+               return nil, err
+       }
+       return replicas, nil
+}
+
+func getNodeReplicas(client *executor.Client, replicaServer *NodesCapacity) 
(replicas, error) {
+       node := replicaServer.Node.TCPAddr()
+
+       var totalDiskInfo []interface{}
+       for _, disk := range replicaServer.Disks {
+               tag := disk.Disk
+               diskInfo, err := executor.GetDiskInfo(client, 
executor.CapacitySize, node, "", tag, false)
+               if err != nil {
+                       return nil, err
+               }
+               totalDiskInfo = append(totalDiskInfo, diskInfo...)
+       }
+       replicas, err := executor.ConvertReplicaCapacityStruct(totalDiskInfo)
+       if err != nil {
+               return nil, err
+       }
+       return replicas, nil
+}
+
+func waitCompleted(client *executor.Client, action *ActionProposal) error {
+       for {
+               replicas, err := getNodeReplicas(client, action.to)
+               if err != nil {
+                       toolkits.LogInfo(err.Error())
+                       time.Sleep(time.Second * 10)
+                       continue
+               }
+
+               if !replicas.contain(fmt.Sprintf("%d.%d", 
action.replica.Gpid.Appid,
+                       action.replica.Gpid.PartitionIndex)) {
+                       toolkits.LogInfo(fmt.Sprintf("%s is running", 
action.toString()))
+                       time.Sleep(time.Second * 10)
+                       continue
+               }
+               break
+       }
+       toolkits.LogInfo(fmt.Sprintf("%s is completed and wait 100s to wait gc 
garbage", action.toString()))
+       // set meta level as lively to clean garbage
+       err := executor.SetMetaLevel(client, "lively")
+       if err != nil {
+               return err
+       }
+       // recover  meta level as steady to next action
+       time.Sleep(time.Second * 100)
+       err = executor.SetMetaLevel(client, "steady")
+       if err != nil {
+               return err
+       }
+       fmt.Println()
+       return nil
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to