This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git
The following commit(s) were added to refs/heads/unstable by this push:
new ce3cfb6 Add support of the failover command (#173)
ce3cfb6 is described below
commit ce3cfb692cb98884f6a55352f12201992cfd808b
Author: hulk <[email protected]>
AuthorDate: Sat May 4 12:48:50 2024 +0800
Add support of the failover command (#173)
---
cmd/client/command/create.go | 2 +-
cmd/client/command/failover.go | 108 +++++++++++++++++++++++++++++++++++++++++
cmd/client/main.go | 1 +
server/api/shard.go | 20 +++++++-
4 files changed, 129 insertions(+), 2 deletions(-)
diff --git a/cmd/client/command/create.go b/cmd/client/command/create.go
index fb5fcf9..96850a4 100644
--- a/cmd/client/command/create.go
+++ b/cmd/client/command/create.go
@@ -139,7 +139,7 @@ func createCluster(cli *client, options *CreateOptions)
error {
SetPathParam("namespace", options.namespace).
SetBody(map[string]interface{}{
"name": options.cluster,
- "replica": options.replica,
+ "replicas": options.replica,
"nodes": options.nodes,
"password": options.password,
}).
diff --git a/cmd/client/command/failover.go b/cmd/client/command/failover.go
new file mode 100644
index 0000000..ba869c5
--- /dev/null
+++ b/cmd/client/command/failover.go
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package command
+
+import (
+ "fmt"
+ "strconv"
+
+ "github.com/spf13/cobra"
+)
+
+type FailoverOptions struct {
+ namespace string
+ cluster string
+ preferred string
+}
+
+var failoverOptions FailoverOptions
+
+var FailoverCommand = &cobra.Command{
+ Use: "failover",
+ Short: "Failover the master of a shard",
+ Example: `
+# Failover the master of a shard
+kvctl failover shard <shard_index> -n <namespace> -c <cluster>
+
+# Failover the master of a shard with preferred slave
+kvctl failover shard <shard_index> --preferred <node_id> -n <namespace> -c
<cluster>
+`,
+ PreRunE: failoverPreRun,
+ RunE: func(cmd *cobra.Command, args []string) error {
+ if len(args) < 2 {
+ return fmt.Errorf("missing shard index, plese specify
the shard index")
+ }
+ host, _ := cmd.Flags().GetString("host")
+ client := newClient(host)
+ resource := args[0]
+ switch resource {
+ case "shard":
+ shardIndex, err := strconv.Atoi(args[1])
+ if err != nil {
+ return fmt.Errorf("invalid shard index: %s",
args[1])
+ }
+ return failoverShard(client, &failoverOptions,
shardIndex)
+ default:
+ return fmt.Errorf("unsupported resource type: %s",
resource)
+ }
+ },
+}
+
+func failoverPreRun(cmd *cobra.Command, args []string) error {
+ if len(args) < 1 {
+ return fmt.Errorf("missing resource name, must be [shard]")
+ }
+ if failoverOptions.namespace == "" {
+ return fmt.Errorf("namespace is required")
+ }
+ if failoverOptions.cluster == "" {
+ return fmt.Errorf("cluster is required")
+ }
+ return nil
+}
+
+func failoverShard(client *client, options *FailoverOptions, shardIndex int)
error {
+ rsp, err := client.restyCli.R().
+ SetPathParam("namespace", options.namespace).
+ SetPathParam("cluster", options.cluster).
+ SetPathParam("shard", strconv.Itoa(shardIndex)).
+
Post("/namespaces/{namespace}/clusters/{cluster}/shards/{shard}/failover")
+ if err != nil {
+ return err
+ }
+ if rsp.IsError() {
+ return unmarshalError(rsp.Body())
+ }
+ var result struct {
+ NewMasterID string `json:"new_master_id"`
+ }
+ if err := unmarshalData(rsp.Body(), &result); err != nil {
+ return err
+ }
+ printLine("failover shard %d successfully, new master id: %s.",
shardIndex, result.NewMasterID)
+ return nil
+}
+
+func init() {
+ FailoverCommand.Flags().StringVarP(&failoverOptions.namespace,
"namespace", "n", "", "The namespace of the cluster")
+ FailoverCommand.Flags().StringVarP(&failoverOptions.cluster, "cluster",
"c", "", "The name of the cluster")
+ FailoverCommand.Flags().StringVarP(&failoverOptions.preferred,
"preferred", "", "", "The preferred slave node id")
+}
diff --git a/cmd/client/main.go b/cmd/client/main.go
index 93e2251..214cd65 100644
--- a/cmd/client/main.go
+++ b/cmd/client/main.go
@@ -58,6 +58,7 @@ func init() {
rootCommand.AddCommand(command.DeleteCommand)
rootCommand.AddCommand(command.ImportCommand)
rootCommand.AddCommand(command.MigrateCommand)
+ rootCommand.AddCommand(command.FailoverCommand)
rootCommand.SilenceUsage = true
rootCommand.SilenceErrors = true
diff --git a/server/api/shard.go b/server/api/shard.go
index 9ccf258..ff673c8 100644
--- a/server/api/shard.go
+++ b/server/api/shard.go
@@ -21,6 +21,7 @@ package api
import (
"errors"
+ "io"
"strconv"
"github.com/gin-gonic/gin"
@@ -116,9 +117,26 @@ func (handler *ShardHandler) Remove(c *gin.Context) {
func (handler *ShardHandler) Failover(c *gin.Context) {
ns := c.Param("namespace")
cluster, _ := c.MustGet(consts.ContextKeyCluster).(*store.Cluster)
+
+ var req struct {
+ PreferredNodeID string `json:"preferred_node_id"`
+ }
+ if c.Request.Body != nil {
+ body, err := io.ReadAll(c.Request.Body)
+ if err != nil {
+ helper.ResponseBadRequest(c, err)
+ return
+ }
+ if len(body) > 0 {
+ if err := c.BindJSON(&req); err != nil {
+ helper.ResponseBadRequest(c, err)
+ return
+ }
+ }
+ }
// We have checked this if statement in middleware.RequiredClusterShard
shardIndex, _ := strconv.Atoi(c.Param("shard"))
- newMasterNodeID, err := cluster.PromoteNewMaster(c, shardIndex, "", "")
+ newMasterNodeID, err := cluster.PromoteNewMaster(c, shardIndex, "",
req.PreferredNodeID)
if err != nil {
helper.ResponseError(c, err)
return