This is an automated email from the ASF dual-hosted git repository.
yuchenhe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 5278a669 refactor(admin-cli): move disk-balance code into `toolkits`
package (#958)
5278a669 is described below
commit 5278a669723e1c90e43d104c6207c258cbf15fa5
Author: Jiashuo <[email protected]>
AuthorDate: Fri Apr 29 16:29:12 2022 +0800
refactor(admin-cli): move disk-balance code into `toolkits` package (#958)
---
admin-cli/README.md | 2 +-
admin-cli/cmd/disk_balance.go | 3 +-
admin-cli/executor/disk_info.go | 28 ++-
admin-cli/executor/disk_migrate.go | 61 +++++
admin-cli/executor/partition_split.go | 2 +-
.../executor/toolkits/diskbalancer/balancer.go | 107 ++++++++
.../diskbalancer/helper.go} | 272 ++++++---------------
.../executor/toolkits/{nodesmigrator => }/log.go | 10 +-
.../executor/toolkits/nodesmigrator/manager.go | 9 +-
.../executor/toolkits/nodesmigrator/migrator.go | 27 +-
admin-cli/executor/toolkits/nodesmigrator/nodes.go | 11 +-
11 files changed, 298 insertions(+), 234 deletions(-)
diff --git a/admin-cli/README.md b/admin-cli/README.md
index f763510a..3db5d255 100644
--- a/admin-cli/README.md
+++ b/admin-cli/README.md
@@ -18,7 +18,7 @@ under the License.
-->
# admin-cli
-[](https://github.com/apache/incubator-pegasus/actions/workflows/pull-request-go.yml)
+[](https://github.com/apache/incubator-pegasus/actions/workflows/lint_and_test_admin-cli.yml)
The command-line tool for the administration of Pegasus.
diff --git a/admin-cli/cmd/disk_balance.go b/admin-cli/cmd/disk_balance.go
index f08f7aed..2a9647ad 100644
--- a/admin-cli/cmd/disk_balance.go
+++ b/admin-cli/cmd/disk_balance.go
@@ -21,6 +21,7 @@ package cmd
import (
"github.com/apache/incubator-pegasus/admin-cli/executor"
+
"github.com/apache/incubator-pegasus/admin-cli/executor/toolkits/diskbalancer"
"github.com/apache/incubator-pegasus/admin-cli/shell"
"github.com/desertbit/grumble"
@@ -60,7 +61,7 @@ func init() {
f.Int("i", "interval", 650, "wait disk clean garbage
replica interval")
},
Run: func(c *grumble.Context) error {
- return executor.DiskBalance(pegasusClient,
c.Flags.String("node"), c.Flags.Int64("size"), c.Flags.Int("interval"),
c.Flags.Bool("auto"))
+ return diskbalancer.BalanceDiskCapacity(pegasusClient,
c.Flags.String("node"), c.Flags.Int64("size"), c.Flags.Int("interval"),
c.Flags.Bool("auto"))
},
})
diff --git a/admin-cli/executor/disk_info.go b/admin-cli/executor/disk_info.go
index 8f33dd73..24ad8f6a 100644
--- a/admin-cli/executor/disk_info.go
+++ b/admin-cli/executor/disk_info.go
@@ -40,11 +40,11 @@ const (
// QueryDiskInfo command
func QueryDiskInfo(client *Client, infoType DiskInfoType, replicaServer
string, tableName string, diskTag string) error {
- _, err := queryDiskInfo(client, infoType, replicaServer, tableName,
diskTag, true)
+ _, err := GetDiskInfo(client, infoType, replicaServer, tableName,
diskTag, true)
return err
}
-func queryDiskInfo(client *Client, infoType DiskInfoType, replicaServer
string, tableName string, diskTag string, print bool) ([]interface{}, error) {
+func GetDiskInfo(client *Client, infoType DiskInfoType, replicaServer string,
tableName string, diskTag string, print bool) ([]interface{}, error) {
resp, err := sendQueryDiskInfoRequest(client, replicaServer, tableName)
if err != nil {
return nil, err
@@ -52,9 +52,9 @@ func queryDiskInfo(client *Client, infoType DiskInfoType,
replicaServer string,
switch infoType {
case CapacitySize:
- return queryDiskCapacity(client, replicaServer, resp, diskTag,
print), nil
+ return fillDiskCapacity(client, replicaServer, resp, diskTag,
print), nil
case ReplicaCount:
- return queryDiskReplicaCount(client, resp, print), nil
+ return fillDiskReplicaCount(client, resp, print), nil
default:
return nil, fmt.Errorf("not support query this disk info: %s",
infoType)
}
@@ -110,7 +110,7 @@ type ReplicaCapacityStruct struct {
Size int64 `json:"size"`
}
-func queryDiskCapacity(client *Client, replicaServer string, resp
*radmin.QueryDiskInfoResponse, diskTag string, print bool) []interface{} {
+func fillDiskCapacity(client *Client, replicaServer string, resp
*radmin.QueryDiskInfoResponse, diskTag string, print bool) []interface{} {
var diskCapacityInfos []interface{}
var replicaCapacityInfos []interface{}
@@ -155,7 +155,7 @@ func queryDiskCapacity(client *Client, replicaServer
string, resp *radmin.QueryD
return diskCapacityInfos
}
-func queryDiskReplicaCount(client *Client, resp *radmin.QueryDiskInfoResponse,
print bool) []interface{} {
+func fillDiskReplicaCount(client *Client, resp *radmin.QueryDiskInfoResponse,
print bool) []interface{} {
type ReplicaCountStruct struct {
Disk string `json:"disk"`
Primary int `json:"primary"`
@@ -213,3 +213,19 @@ func AddDisk(client *Client, replicaServer string, diskStr
string) error {
fmt.Printf("Node[%s] add new disk succeed\n", replicaServer)
return nil
}
+
+func ConvertReplicaCapacityStruct(replicaCapacityInfos []interface{})
([]ReplicaCapacityStruct, error) {
+ util.SortStructsByField(replicaCapacityInfos, "Size")
+ var replicas []ReplicaCapacityStruct
+ for _, replica := range replicaCapacityInfos {
+ if r, ok := replica.(ReplicaCapacityStruct); ok {
+ replicas = append(replicas, r)
+ } else {
+ return nil, fmt.Errorf("can't covert to
ReplicaCapacityStruct")
+ }
+ }
+ if replicas == nil {
+ return nil, fmt.Errorf("the disk has no replica")
+ }
+ return replicas, nil
+}
diff --git a/admin-cli/executor/disk_migrate.go
b/admin-cli/executor/disk_migrate.go
new file mode 100644
index 00000000..4da4b01e
--- /dev/null
+++ b/admin-cli/executor/disk_migrate.go
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package executor
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/XiaoMi/pegasus-go-client/idl/radmin"
+ "github.com/XiaoMi/pegasus-go-client/session"
+ "github.com/apache/incubator-pegasus/admin-cli/util"
+)
+
+func DiskMigrate(client *Client, replicaServer string, pidStr string, from
string, to string) error {
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
+ defer cancel()
+
+ pid, err := util.Str2Gpid(pidStr)
+ if err != nil {
+ return err
+ }
+
+ node, err := client.Nodes.GetNode(replicaServer,
session.NodeTypeReplica)
+ if err != nil {
+ return err
+ }
+ replica := node.Replica()
+
+ resp, err := replica.DiskMigrate(ctx, &radmin.ReplicaDiskMigrateRequest{
+ Pid: pid,
+ OriginDisk: from,
+ TargetDisk: to,
+ })
+
+ if err != nil {
+ if resp != nil && resp.Hint != nil {
+ return fmt.Errorf("Internal server error [%s:%s]", err,
*resp.Hint)
+ }
+ return err
+ }
+
+ return nil
+}
diff --git a/admin-cli/executor/partition_split.go
b/admin-cli/executor/partition_split.go
index 307d13a4..10bab4d9 100644
--- a/admin-cli/executor/partition_split.go
+++ b/admin-cli/executor/partition_split.go
@@ -134,7 +134,7 @@ func DiskBeforeSplit(client *Client, tableName string)
error {
for address, resp := range respMap {
for _, diskInfo := range resp.GetDiskInfos() {
diskTag := diskInfo.GetTag()
- rCapacityList, err :=
convertReplicaCapacityStruct(queryDiskCapacity(client, address, resp, diskTag,
false))
+ rCapacityList, err :=
ConvertReplicaCapacityStruct(fillDiskCapacity(client, address, resp, diskTag,
false))
if err != nil {
return fmt.Errorf("%s [hint: failed to get info
for node(%s) disk(%s) when disk check before split]", err, address, diskTag)
}
diff --git a/admin-cli/executor/toolkits/diskbalancer/balancer.go
b/admin-cli/executor/toolkits/diskbalancer/balancer.go
new file mode 100644
index 00000000..eca177c9
--- /dev/null
+++ b/admin-cli/executor/toolkits/diskbalancer/balancer.go
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package diskbalancer
+
+import (
+ "fmt"
+ "strings"
+ "time"
+
+ "github.com/XiaoMi/pegasus-go-client/idl/base"
+ "github.com/apache/incubator-pegasus/admin-cli/executor"
+ "github.com/apache/incubator-pegasus/admin-cli/executor/toolkits"
+)
+
+var WaitRunning = time.Second * 10 // time for wait migrate complete
+var WaitCleaning = time.Second * 90 // time for wait garbage replica to clean
complete
+
+var ShortCleanInterval = "1" // short time wait the disk cleaner clean
garbage replica
+var LongCleanInterval = "86400" // long time wait the disk cleaner clean
garbage replica
+
+// auto balance target node disk usage:
+// -1. change the pegasus server disk cleaner internal for clean temp replica
to free disk space in time
+// -2. get the optimal migrate action to be ready to balance the disk until
can't migrate base latest disk space stats
+// -3. if current replica is `primary` status, force assign the replica to
`secondary` status
+// -4. migrate the replica base `getNextMigrateAction` result
+// -5. loop query migrate progress using `DiskMigrate`, it will response
`ERR_BUSY` if running
+// -6. start next loop until can't allow to balance the node
+// -7. recover disk cleaner internal if balance complete
+// -8. set meta status to `lively` to balance primary and secondary //
TODO(jiashuo1)
+func BalanceDiskCapacity(client *executor.Client, replicaServer string,
minSize int64, interval int, auto bool) error {
+ WaitCleaning = time.Second * time.Duration(interval)
+
+ if err := changeDiskCleanerInterval(client, replicaServer,
ShortCleanInterval); err != nil {
+ return err
+ }
+ defer func() {
+ if err := changeDiskCleanerInterval(client, replicaServer,
LongCleanInterval); err != nil {
+ toolkits.LogWarn("revert disk cleaner failed")
+ }
+ }()
+
+ for {
+ action, err := getNextMigrateAction(client, replicaServer,
minSize)
+ if err != nil {
+ return err
+ }
+
+ err = executor.DiskMigrate(client, replicaServer,
action.replica.Gpid, action.from, action.to)
+ if err != nil && action.replica.Status != "secondary" {
+ err := forceAssignReplicaToSecondary(client,
replicaServer, action.replica.Gpid)
+ if err != nil {
+ return err
+ }
+ time.Sleep(WaitRunning)
+ continue
+ }
+
+ if err != nil {
+ return fmt.Errorf("migrate(%s) start failed[auto=%v]
err = %s", action.toString(), auto, err.Error())
+ }
+
+ toolkits.LogInfo(fmt.Sprintf("migrate(%s) has started[auto=%v],
wait complete...\n", action.toString(), auto))
+ for {
+ // TODO(jiashuo1): using DiskMigrate RPC to query
status, consider support queryDiskMigrateStatus RPC
+ err = executor.DiskMigrate(client, replicaServer,
action.replica.Gpid, action.from, action.to)
+ if err == nil {
+ time.Sleep(WaitRunning)
+ continue
+ }
+ if strings.Contains(err.Error(),
base.ERR_BUSY.String()) {
+ toolkits.LogInfo(fmt.Sprintf("migrate(%s) is
running, msg=%s, wait complete...\n", action.toString(), err.Error()))
+ time.Sleep(WaitRunning)
+ continue
+ }
+ toolkits.LogInfo(fmt.Sprintf("migrate(%s) is
completed,result=%s, wait[%ds] disk cleaner remove garbage...\n\n",
+ action.toString(), err.Error(), interval))
+ break
+ }
+
+ if auto {
+ time.Sleep(WaitCleaning)
+ continue
+ }
+
+ time.Sleep(WaitCleaning)
+ toolkits.LogInfo(fmt.Sprintf("you now disable
auto-balance[auto=%v], stop and wait manual loop\n\n", auto))
+ break
+ }
+ return nil
+}
diff --git a/admin-cli/executor/disk_balance.go
b/admin-cli/executor/toolkits/diskbalancer/helper.go
similarity index 56%
rename from admin-cli/executor/disk_balance.go
rename to admin-cli/executor/toolkits/diskbalancer/helper.go
index 82ac17ed..09096e90 100644
--- a/admin-cli/executor/disk_balance.go
+++ b/admin-cli/executor/toolkits/diskbalancer/helper.go
@@ -17,131 +17,23 @@
* under the License.
*/
-package executor
+package diskbalancer
import (
- "context"
"fmt"
"math"
- "strings"
- "time"
"github.com/XiaoMi/pegasus-go-client/idl/admin"
- "github.com/XiaoMi/pegasus-go-client/idl/base"
- "github.com/XiaoMi/pegasus-go-client/idl/radmin"
"github.com/XiaoMi/pegasus-go-client/session"
adminClient "github.com/apache/incubator-pegasus/admin-cli/client"
+ "github.com/apache/incubator-pegasus/admin-cli/executor"
+ "github.com/apache/incubator-pegasus/admin-cli/executor/toolkits"
"github.com/apache/incubator-pegasus/admin-cli/util"
)
-func DiskMigrate(client *Client, replicaServer string, pidStr string, from
string, to string) error {
- ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
- defer cancel()
-
- pid, err := util.Str2Gpid(pidStr)
- if err != nil {
- return err
- }
-
- node, err := client.Nodes.GetNode(replicaServer,
session.NodeTypeReplica)
- if err != nil {
- return err
- }
- replica := node.Replica()
-
- resp, err := replica.DiskMigrate(ctx, &radmin.ReplicaDiskMigrateRequest{
- Pid: pid,
- OriginDisk: from,
- TargetDisk: to,
- })
-
- if err != nil {
- if resp != nil && resp.Hint != nil {
- return fmt.Errorf("Internal server error [%s:%s]", err,
*resp.Hint)
- }
- return err
- }
-
- return nil
-}
-
-var WaitRunning = time.Second * 10 // time for wait migrate complete
-var WaitCleaning = time.Second * 90 // time for wait garbage replica to clean
complete
-
-// auto balance target node disk usage:
-// -1. change the pegasus server disk cleaner internal for clean temp replica
to free disk space in time
-// -2. get the optimal migrate action to be ready to balance the disk until
can't migrate base latest disk space stats
-// -3. if current replica is `primary` status, force assign the replica to
`secondary` status
-// -4. migrate the replica base `getNextMigrateAction` result
-// -5. loop query migrate progress using `DiskMigrate`, it will response
`ERR_BUSY` if running
-// -6. start next loop until can't allow to balance the node
-// -7. recover disk cleaner internal if balance complete
-// -8. set meta status to `lively` to balance primary and secondary //
TODO(jiashuo1)
-func DiskBalance(client *Client, replicaServer string, minSize int64, interval
int, auto bool) error {
- WaitCleaning = time.Second * time.Duration(interval)
-
- if err := changeDiskCleanerInterval(client, replicaServer, "1"); err !=
nil {
- return err
- }
- defer func() {
- if err := changeDiskCleanerInterval(client, replicaServer,
"86400"); err != nil {
- fmt.Println("revert disk cleaner failed")
- }
- }()
-
- for {
- action, err := getNextMigrateAction(client, replicaServer,
minSize)
- if err != nil {
- return err
- }
-
- err = DiskMigrate(client, replicaServer, action.replica.Gpid,
action.from, action.to)
- if err != nil && action.replica.Status != "secondary" {
- err := forceAssignReplicaToSecondary(client,
replicaServer, action.replica.Gpid)
- if err != nil {
- return err
- }
- time.Sleep(WaitRunning)
- continue
- }
-
- if err != nil {
- return fmt.Errorf("migrate(%s) start failed[auto=%v]
err = %s", action.toString(), auto, err.Error())
- }
-
- fmt.Printf("migrate(%s) has started[auto=%v], wait
complete...\n", action.toString(), auto)
- for {
- // TODO(jiashuo1): using DiskMigrate RPC to query
status, consider support queryDiskMigrateStatus RPC
- err = DiskMigrate(client, replicaServer,
action.replica.Gpid, action.from, action.to)
- if err == nil {
- time.Sleep(WaitRunning)
- continue
- }
- if strings.Contains(err.Error(),
base.ERR_BUSY.String()) {
- fmt.Printf("migrate(%s) is running, msg=%s,
wait complete...\n", action.toString(), err.Error())
- time.Sleep(WaitRunning)
- continue
- }
- fmt.Printf("migrate(%s) is completed,result=%s,
wait[%ds] disk cleaner remove garbage...\n\n",
- action.toString(), err.Error(), interval)
- break
- }
-
- if auto {
- time.Sleep(WaitCleaning)
- continue
- }
-
- time.Sleep(WaitCleaning)
- fmt.Printf("you now disable auto-balance[auto=%v], stop and
wait manual loop\n\n", auto)
- break
- }
- return nil
-}
-
type DiskStats struct {
- diskCapacity DiskCapacityStruct
- replicaCapacity []ReplicaCapacityStruct
+ diskCapacity executor.DiskCapacityStruct
+ replicaCapacity []executor.ReplicaCapacityStruct
}
type MigrateDisk struct {
@@ -159,7 +51,7 @@ func (m *MigrateDisk) toString() string {
type MigrateAction struct {
node string
- replica ReplicaCapacityStruct
+ replica executor.ReplicaCapacityStruct
from string
to string
}
@@ -168,14 +60,14 @@ func (m *MigrateAction) toString() string {
return fmt.Sprintf("node=%s, replica=%s, %s=>%s", m.node,
m.replica.Gpid, m.from, m.to)
}
-func changeDiskCleanerInterval(client *Client, replicaServer string,
cleanInterval string) error {
- fmt.Printf("set gc_disk_migration_origin_replica_interval_seconds = %ss
", cleanInterval)
- err := ConfigCommand(client, session.NodeTypeReplica, replicaServer,
+func changeDiskCleanerInterval(client *executor.Client, replicaServer string,
cleanInterval string) error {
+ toolkits.LogInfo(fmt.Sprintf("set
gc_disk_migration_origin_replica_interval_seconds = %ss ", cleanInterval))
+ err := executor.ConfigCommand(client, session.NodeTypeReplica,
replicaServer,
"gc_disk_migration_origin_replica_interval_seconds", "set",
cleanInterval)
return err
}
-func getNextMigrateAction(client *Client, replicaServer string, minSize int64)
(*MigrateAction, error) {
+func getNextMigrateAction(client *executor.Client, replicaServer string,
minSize int64) (*MigrateAction, error) {
disks, totalUsage, totalCapacity, err := queryDiskCapacityInfo(client,
replicaServer)
if err != nil {
return nil, err
@@ -192,73 +84,17 @@ func getNextMigrateAction(client *Client, replicaServer
string, minSize int64) (
return migrateAction, nil
}
-func forceAssignReplicaToSecondary(client *Client, replicaServer string, gpid
string) error {
- fmt.Printf("WARNING: the select replica is not secondary, will force
assign it secondary\n")
- if _, err :=
client.Meta.MetaControl(admin.MetaFunctionLevel_fl_steady); err != nil {
- return err
- }
- secondaryNode, err := getReplicaSecondaryNode(client, gpid)
- if err != nil {
- return err
- }
- replica, err := util.Str2Gpid(gpid)
- if err != nil {
- return err
- }
- return client.Meta.Balance(replica, adminClient.BalanceMovePri,
- util.NewNodeFromTCPAddr(replicaServer,
session.NodeTypeReplica), secondaryNode)
-}
-
-func getReplicaSecondaryNode(client *Client, gpid string) (*util.PegasusNode,
error) {
- replica, err := util.Str2Gpid(gpid)
- if err != nil {
- return nil, err
- }
- tables, err := client.Meta.ListApps(admin.AppStatus_AS_AVAILABLE)
- if err != nil {
- return nil, fmt.Errorf("can't get the table name of replica %s
when migrate the replica", gpid)
- }
- var tableName string
- for _, tb := range tables {
- if tb.AppID == replica.Appid {
- tableName = tb.AppName
- break
- }
- }
- if tableName == "" {
- return nil, fmt.Errorf("can't find the table for %s when
migrate the replica", gpid)
- }
-
- resp, err := client.Meta.QueryConfig(tableName)
- if err != nil {
- return nil, fmt.Errorf("can't get the table %s configuration
when migrate the replica(%s): %s",
- tableName, gpid, err)
- }
-
- var secondaryNode *util.PegasusNode
- for _, partition := range resp.Partitions {
- if partition.Pid.String() == replica.String() {
- secondaryNode =
util.NewNodeFromTCPAddr(partition.Secondaries[0].GetAddress(),
session.NodeTypeReplica)
- }
- }
-
- if secondaryNode == nil {
- return nil, fmt.Errorf("can't get the replica %s secondary
node", gpid)
- }
- return secondaryNode, nil
-}
-
-func queryDiskCapacityInfo(client *Client, replicaServer string)
([]DiskCapacityStruct, int64, int64, error) {
- diskCapacityOnNode, err := queryDiskInfo(client, CapacitySize,
replicaServer, "", "", false)
+func queryDiskCapacityInfo(client *executor.Client, replicaServer string)
([]executor.DiskCapacityStruct, int64, int64, error) {
+ diskCapacityOnNode, err := executor.GetDiskInfo(client,
executor.CapacitySize, replicaServer, "", "", false)
if err != nil {
return nil, 0, 0, err
}
util.SortStructsByField(diskCapacityOnNode, "Usage")
- var disks []DiskCapacityStruct
+ var disks []executor.DiskCapacityStruct
var totalUsage int64
var totalCapacity int64
for _, disk := range diskCapacityOnNode {
- if s, ok := disk.(DiskCapacityStruct); ok {
+ if s, ok := disk.(executor.DiskCapacityStruct); ok {
disks = append(disks, s)
totalUsage += s.Usage
totalCapacity += s.Capacity
@@ -277,15 +113,15 @@ func queryDiskCapacityInfo(client *Client, replicaServer
string) ([]DiskCapacity
return disks, totalUsage, totalCapacity, nil
}
-func getMigrateDiskInfo(client *Client, replicaServer string, disks
[]DiskCapacityStruct,
+func getMigrateDiskInfo(client *executor.Client, replicaServer string, disks
[]executor.DiskCapacityStruct,
totalUsage int64, totalCapacity int64) (*MigrateDisk, error) {
highUsageDisk := disks[len(disks)-1]
- highDiskInfo, err := queryDiskInfo(client, CapacitySize, replicaServer,
"", highUsageDisk.Disk, false)
+ highDiskInfo, err := executor.GetDiskInfo(client,
executor.CapacitySize, replicaServer, "", highUsageDisk.Disk, false)
if err != nil {
return nil, err
}
lowUsageDisk := disks[0]
- lowDiskInfo, err := queryDiskInfo(client, CapacitySize, replicaServer,
"", lowUsageDisk.Disk, false)
+ lowDiskInfo, err := executor.GetDiskInfo(client, executor.CapacitySize,
replicaServer, "", lowUsageDisk.Disk, false)
if err != nil {
return nil, err
}
@@ -305,13 +141,13 @@ func getMigrateDiskInfo(client *Client, replicaServer
string, disks []DiskCapaci
lowUsageDisk.Usage, lowUsageDisk.Ratio, averageUsage,
averageRatio)
}
- replicaCapacityOnHighDisk, err :=
convertReplicaCapacityStruct(highDiskInfo)
- if err != nil { // we need migrate replica from high disk, so the
convert must be successful
+ replicaCapacityOnHighDisk, err :=
executor.ConvertReplicaCapacityStruct(highDiskInfo)
+ if err != nil {
return nil, fmt.Errorf("parse replica info on high disk(%s)
failed: %s", highUsageDisk.Disk, err.Error())
}
- replicaCapacityOnLowDisk, err :=
convertReplicaCapacityStruct(lowDiskInfo)
- if err != nil { // we don't care the replica capacity info on low disk,
if convert failed, we only log warning and know the result=nil
- fmt.Printf("WARNING: parse replica info on low disk(%s) failed:
%s", lowUsageDisk.Disk, err.Error())
+ replicaCapacityOnLowDisk, err :=
executor.ConvertReplicaCapacityStruct(lowDiskInfo)
+ if err != nil {
+ return nil, fmt.Errorf("parse replica info on low disk(%s)
failed: %s", highUsageDisk.Disk, err.Error())
}
return &MigrateDisk{
averageUsage: averageUsage,
@@ -332,7 +168,7 @@ func computeMigrateAction(migrate *MigrateDisk, minSize
int64) (*MigrateAction,
highDiskCanSendMax := migrate.highDisk.diskCapacity.Usage -
migrate.averageUsage
sizeNeedMove := int64(math.Min(float64(lowDiskCanReceiveMax),
float64(highDiskCanSendMax)))
- var selectReplica *ReplicaCapacityStruct
+ var selectReplica *executor.ReplicaCapacityStruct
for i := len(migrate.highDisk.replicaCapacity) - 1; i >= 0; i-- {
if migrate.highDisk.replicaCapacity[i].Size > sizeNeedMove {
continue
@@ -367,18 +203,58 @@ func computeMigrateAction(migrate *MigrateDisk, minSize
int64) (*MigrateAction,
}, nil
}
-func convertReplicaCapacityStruct(replicaCapacityInfos []interface{})
([]ReplicaCapacityStruct, error) {
- util.SortStructsByField(replicaCapacityInfos, "Size")
- var replicas []ReplicaCapacityStruct
- for _, replica := range replicaCapacityInfos {
- if r, ok := replica.(ReplicaCapacityStruct); ok {
- replicas = append(replicas, r)
- } else {
- return nil, fmt.Errorf("can't covert to
ReplicaCapacityStruct")
+func forceAssignReplicaToSecondary(client *executor.Client, replicaServer
string, gpid string) error {
+ fmt.Printf("WARNING: the select replica is not secondary, will force
assign it secondary\n")
+ if _, err :=
client.Meta.MetaControl(admin.MetaFunctionLevel_fl_steady); err != nil {
+ return err
+ }
+ secondaryNode, err := getReplicaSecondaryNode(client, gpid)
+ if err != nil {
+ return err
+ }
+ replica, err := util.Str2Gpid(gpid)
+ if err != nil {
+ return err
+ }
+ return client.Meta.Balance(replica, adminClient.BalanceMovePri,
+ util.NewNodeFromTCPAddr(replicaServer,
session.NodeTypeReplica), secondaryNode)
+}
+
+func getReplicaSecondaryNode(client *executor.Client, gpid string)
(*util.PegasusNode, error) {
+ replica, err := util.Str2Gpid(gpid)
+ if err != nil {
+ return nil, err
+ }
+ tables, err := client.Meta.ListApps(admin.AppStatus_AS_AVAILABLE)
+ if err != nil {
+ return nil, fmt.Errorf("can't get the table name of replica %s
when migrate the replica", gpid)
+ }
+ var tableName string
+ for _, tb := range tables {
+ if tb.AppID == replica.Appid {
+ tableName = tb.AppName
+ break
+ }
+ }
+ if tableName == "" {
+ return nil, fmt.Errorf("can't find the table for %s when
migrate the replica", gpid)
+ }
+
+ resp, err := client.Meta.QueryConfig(tableName)
+ if err != nil {
+ return nil, fmt.Errorf("can't get the table %s configuration
when migrate the replica(%s): %s",
+ tableName, gpid, err)
+ }
+
+ var secondaryNode *util.PegasusNode
+ for _, partition := range resp.Partitions {
+ if partition.Pid.String() == replica.String() {
+ secondaryNode =
util.NewNodeFromTCPAddr(partition.Secondaries[0].GetAddress(),
session.NodeTypeReplica)
}
}
- if replicas == nil {
- return nil, fmt.Errorf("the disk has no replica")
+
+ if secondaryNode == nil {
+ return nil, fmt.Errorf("can't get the replica %s secondary
node", gpid)
}
- return replicas, nil
+ return secondaryNode, nil
}
diff --git a/admin-cli/executor/toolkits/nodesmigrator/log.go
b/admin-cli/executor/toolkits/log.go
similarity index 88%
rename from admin-cli/executor/toolkits/nodesmigrator/log.go
rename to admin-cli/executor/toolkits/log.go
index 71fabd75..4ac2d85a 100644
--- a/admin-cli/executor/toolkits/nodesmigrator/log.go
+++ b/admin-cli/executor/toolkits/log.go
@@ -17,7 +17,7 @@
* under the License.
*/
-package nodesmigrator
+package toolkits
import (
"fmt"
@@ -25,21 +25,21 @@ import (
"github.com/sirupsen/logrus"
)
-func logInfo(log string) {
+func LogInfo(log string) {
fmt.Printf("INFO: %s\n", log)
logrus.Info(log)
}
-func logWarn(log string) {
+func LogWarn(log string) {
fmt.Printf("WARN: %s\n", log)
logrus.Warn(log)
}
-func logDebug(log string) {
+func LogDebug(log string) {
logrus.Debug(log)
}
-func logPanic(log string) {
+func LogPanic(log string) {
fmt.Println(log)
logrus.Panic(log)
}
diff --git a/admin-cli/executor/toolkits/nodesmigrator/manager.go
b/admin-cli/executor/toolkits/nodesmigrator/manager.go
index 6a5c9088..84f8934f 100644
--- a/admin-cli/executor/toolkits/nodesmigrator/manager.go
+++ b/admin-cli/executor/toolkits/nodesmigrator/manager.go
@@ -28,6 +28,7 @@ import (
"github.com/XiaoMi/pegasus-go-client/idl/admin"
"github.com/apache/incubator-pegasus/admin-cli/executor"
+ "github.com/apache/incubator-pegasus/admin-cli/executor/toolkits"
"github.com/apache/incubator-pegasus/admin-cli/util"
)
@@ -39,11 +40,11 @@ func MigrateAllReplicaToNodes(client *executor.Client, from
[]string, to []strin
GlobalBatchTable = batchTable
GlobalBatchTarget = batchTarget
if len(to) != targetCount {
- logPanic(fmt.Sprintf("please make sure the targets count ==
`--final_target` value: %d vs %d", len(to),
+ toolkits.LogPanic(fmt.Sprintf("please make sure the targets
count == `--final_target` value: %d vs %d", len(to),
targetCount))
}
- logWarn(fmt.Sprintf("you now migrate to target count assign to be %d in
final, "+
+ toolkits.LogWarn(fmt.Sprintf("you now migrate to target count assign to
be %d in final, "+
"please make sure it is ok! sleep 10s and then start",
targetCount))
time.Sleep(time.Second * 10)
@@ -75,14 +76,14 @@ func MigrateAllReplicaToNodes(client *executor.Client, from
[]string, to []strin
balanceFactor := 0
for {
if totalRemainingReplica <= 0 {
- logInfo("\n\n==============completed for all origin
nodes has been migrated================")
+ toolkits.LogInfo("\n\n==============completed for all
origin nodes has been migrated================")
return executor.ListNodes(client)
}
if currentOriginNode.String() == firstOrigin.String() {
originRound++
}
- logInfo(fmt.Sprintf("\n\n*******************[%d|%s]start
migrate replicas, remainingReplica=%d*****************",
+
toolkits.LogInfo(fmt.Sprintf("\n\n*******************[%d|%s]start migrate
replicas, remainingReplica=%d*****************",
balanceFactor, currentOriginNode.String(),
totalRemainingReplica))
currentOriginNode.downgradeAllReplicaToSecondary(client)
diff --git a/admin-cli/executor/toolkits/nodesmigrator/migrator.go
b/admin-cli/executor/toolkits/nodesmigrator/migrator.go
index 61304d07..6899b738 100644
--- a/admin-cli/executor/toolkits/nodesmigrator/migrator.go
+++ b/admin-cli/executor/toolkits/nodesmigrator/migrator.go
@@ -32,6 +32,7 @@ import (
"github.com/XiaoMi/pegasus-go-client/session"
migrator "github.com/apache/incubator-pegasus/admin-cli/client"
"github.com/apache/incubator-pegasus/admin-cli/executor"
+ "github.com/apache/incubator-pegasus/admin-cli/executor/toolkits"
"github.com/apache/incubator-pegasus/admin-cli/util"
)
@@ -52,7 +53,7 @@ func (m *Migrator) run(client *executor.Client, table string,
balanceFactor int,
for {
target := m.selectNextTargetNode(targets)
if target.String() == origin.String() {
- logInfo(fmt.Sprintf("completed for origin and target is
same: %s", origin.String()))
+ toolkits.LogInfo(fmt.Sprintf("completed for origin and
target is same: %s", origin.String()))
return m.getTotalRemainingReplicaCount()
}
@@ -60,7 +61,7 @@ func (m *Migrator) run(client *executor.Client, table string,
balanceFactor int,
m.updateOngoingActionList()
remainingCount := m.getRemainingReplicaCount(origin)
if remainingCount <= 0 ||
len(balanceTargets)+len(invalidTargets) >= len(targets) {
- logInfo(fmt.Sprintf("[%s]completed(remaining=%d,
balance=%d, invalid=%d, running_target=%d, final_target=%d) for no replicas can
be migrated",
+
toolkits.LogInfo(fmt.Sprintf("[%s]completed(remaining=%d, balance=%d,
invalid=%d, running_target=%d, final_target=%d) for no replicas can be
migrated",
table, remainingCount, len(balanceTargets),
len(invalidTargets), len(targets), len(m.targets)))
return m.getTotalRemainingReplicaCount()
}
@@ -69,7 +70,7 @@ func (m *Migrator) run(client *executor.Client, table string,
balanceFactor int,
currentCount := m.getCurrentReplicaCount(target)
if currentCount >= expectCount {
balanceTargets[target.String()] = 1
- logDebug(fmt.Sprintf("[%s]balance: no need migrate
replicas to %s, current=%d, expect=max(%d), total_balance=%d",
+ toolkits.LogDebug(fmt.Sprintf("[%s]balance: no need
migrate replicas to %s, current=%d, expect=max(%d), total_balance=%d",
table, target.String(), currentCount,
expectCount, len(balanceTargets)))
if len(m.ongoingActions.actionList) > 0 {
time.Sleep(10 * time.Second)
@@ -79,7 +80,7 @@ func (m *Migrator) run(client *executor.Client, table string,
balanceFactor int,
if !m.existValidReplica(origin, target) {
invalidTargets[target.String()] = 1
- logDebug(fmt.Sprintf("[%s]invalid: no invalid migrate
replicas to %s, total_invalid=%d",
+ toolkits.LogDebug(fmt.Sprintf("[%s]invalid: no invalid
migrate replicas to %s, total_invalid=%d",
table, target.String(), len(invalidTargets)))
if len(m.ongoingActions.actionList) > 0 {
time.Sleep(10 * time.Second)
@@ -89,7 +90,7 @@ func (m *Migrator) run(client *executor.Client, table string,
balanceFactor int,
currentConcurrentCount := target.concurrent(m.ongoingActions)
if currentConcurrentCount == maxConcurrent {
- logDebug(fmt.Sprintf("[%s] %s has excceed the max
concurrent = %d", table, target.String(),
+ toolkits.LogDebug(fmt.Sprintf("[%s] %s has excceed the
max concurrent = %d", table, target.String(),
currentConcurrentCount))
time.Sleep(10 * time.Second)
continue
@@ -118,7 +119,7 @@ func (m *Migrator) selectNextTargetNode(targets
[]*util.PegasusNode) *MigratorNo
func (m *Migrator) updateNodesReplicaInfo(client *executor.Client, table
string) {
for {
if err := m.syncNodesReplicaInfo(client, table); err != nil {
- logDebug(fmt.Sprintf("[%s]table may be unhealthy: %s",
table, err.Error()))
+ toolkits.LogDebug(fmt.Sprintf("[%s]table may be
unhealthy: %s", table, err.Error()))
time.Sleep(10 * time.Second)
continue
}
@@ -240,7 +241,7 @@ func (m *Migrator) sendMigrateRequest(client
*executor.Client, table string, ori
from := m.nodes[origin.String()]
to := m.nodes[target.String()]
if len(from.replicas) == 0 {
- logDebug(fmt.Sprintf("the node[%s] has no replica to migrate",
target.node.String()))
+ toolkits.LogDebug(fmt.Sprintf("the node[%s] has no replica to
migrate", target.node.String()))
return
}
@@ -255,12 +256,12 @@ func (m *Migrator) sendMigrateRequest(client
*executor.Client, table string, ori
}
if to.contain(replica.gpid) {
- logDebug(fmt.Sprintf("actions[%s] target has existed
the replica", action.toString()))
+ toolkits.LogDebug(fmt.Sprintf("actions[%s] target has
existed the replica", action.toString()))
continue
}
if m.totalActions.exist(action) {
- logDebug(fmt.Sprintf("action[%s] has assgin other
task", action.toString()))
+ toolkits.LogDebug(fmt.Sprintf("action[%s] has assgin
other task", action.toString()))
continue
}
@@ -270,10 +271,10 @@ func (m *Migrator) sendMigrateRequest(client
*executor.Client, table string, ori
if err != nil {
m.totalActions.delete(action)
m.ongoingActions.delete(action)
- logWarn(fmt.Sprintf("send failed: %s", err.Error()))
+ toolkits.LogWarn(fmt.Sprintf("send failed: %s",
err.Error()))
continue
}
- logInfo(fmt.Sprintf("[%s]send %s success, ongiong task = %d",
table, action.toString(), target.concurrent(m.ongoingActions)))
+ toolkits.LogInfo(fmt.Sprintf("[%s]send %s success, ongiong task
= %d", table, action.toString(), target.concurrent(m.ongoingActions)))
return
}
}
@@ -290,10 +291,10 @@ func (m *Migrator) updateOngoingActionList() {
for name, act := range m.ongoingActions.actionList {
node := m.nodes[act.to.String()]
if node.contain(act.replica.gpid) {
- logInfo(fmt.Sprintf("%s has completed", name))
+ toolkits.LogInfo(fmt.Sprintf("%s has completed", name))
m.ongoingActions.delete(act)
} else {
- logInfo(fmt.Sprintf("%s is running", name))
+ toolkits.LogInfo(fmt.Sprintf("%s is running", name))
}
}
}
diff --git a/admin-cli/executor/toolkits/nodesmigrator/nodes.go
b/admin-cli/executor/toolkits/nodesmigrator/nodes.go
index 43d2bb3b..dc56b059 100644
--- a/admin-cli/executor/toolkits/nodesmigrator/nodes.go
+++ b/admin-cli/executor/toolkits/nodesmigrator/nodes.go
@@ -26,6 +26,7 @@ import (
"github.com/XiaoMi/pegasus-go-client/idl/base"
migrator "github.com/apache/incubator-pegasus/admin-cli/client"
"github.com/apache/incubator-pegasus/admin-cli/executor"
+ "github.com/apache/incubator-pegasus/admin-cli/executor/toolkits"
"github.com/apache/incubator-pegasus/admin-cli/util"
)
@@ -57,7 +58,7 @@ func (m *MigratorNode) downgradeAllReplicaToSecondary(client
*executor.Client) {
for {
err := migrator.MigratePrimariesOut(client.Meta, m.node)
if err != nil {
- logDebug(fmt.Sprintf("migrate primary out of %s is
invalid now, err = %s\n", m.String(), err))
+ toolkits.LogDebug(fmt.Sprintf("migrate primary out of
%s is invalid now, err = %s\n", m.String(), err))
time.Sleep(10 * time.Second)
continue
}
@@ -75,23 +76,23 @@ func (m *MigratorNode)
downgradeAllReplicaToSecondary(client *executor.Client) {
func (m *MigratorNode) checkIfNoPrimary(client *executor.Client) bool {
tables, err := client.Meta.ListAvailableApps()
if err != nil {
- logDebug(fmt.Sprintf("migrate primary out of %s is invalid when
list app, err = %s", m.String(), err))
+ toolkits.LogDebug(fmt.Sprintf("migrate primary out of %s is
invalid when list app, err = %s", m.String(), err))
return false
}
for _, tb := range tables {
partitions, err := migrator.ListPrimariesOnNode(client.Meta,
m.node, tb.AppName)
if err != nil {
- logDebug(fmt.Sprintf("migrate primary out of %s is
invalid when list primaries, err = %s", m.String(), err))
+ toolkits.LogDebug(fmt.Sprintf("migrate primary out of
%s is invalid when list primaries, err = %s", m.String(), err))
return false
}
if len(partitions) > 0 {
- logDebug(fmt.Sprintf("migrate primary out of %s is not
completed, current count = %d", m.String(), len(partitions)))
+ toolkits.LogDebug(fmt.Sprintf("migrate primary out of
%s is not completed, current count = %d", m.String(), len(partitions)))
return false
}
}
- logInfo(fmt.Sprintf("migrate primary out of %s successfully",
m.String()))
+ toolkits.LogInfo(fmt.Sprintf("migrate primary out of %s successfully",
m.String()))
return true
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]