This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git


The following commit(s) were added to refs/heads/unstable by this push:
     new ce3cfb6  Add support of the failover command (#173)
ce3cfb6 is described below

commit ce3cfb692cb98884f6a55352f12201992cfd808b
Author: hulk <[email protected]>
AuthorDate: Sat May 4 12:48:50 2024 +0800

    Add support of the failover command (#173)
---
 cmd/client/command/create.go   |   2 +-
 cmd/client/command/failover.go | 108 +++++++++++++++++++++++++++++++++++++++++
 cmd/client/main.go             |   1 +
 server/api/shard.go            |  20 +++++++-
 4 files changed, 129 insertions(+), 2 deletions(-)

diff --git a/cmd/client/command/create.go b/cmd/client/command/create.go
index fb5fcf9..96850a4 100644
--- a/cmd/client/command/create.go
+++ b/cmd/client/command/create.go
@@ -139,7 +139,7 @@ func createCluster(cli *client, options *CreateOptions) 
error {
                SetPathParam("namespace", options.namespace).
                SetBody(map[string]interface{}{
                        "name":     options.cluster,
-                       "replica":  options.replica,
+                       "replicas": options.replica,
                        "nodes":    options.nodes,
                        "password": options.password,
                }).
diff --git a/cmd/client/command/failover.go b/cmd/client/command/failover.go
new file mode 100644
index 0000000..ba869c5
--- /dev/null
+++ b/cmd/client/command/failover.go
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package command
+
+import (
+       "fmt"
+       "strconv"
+
+       "github.com/spf13/cobra"
+)
+
+type FailoverOptions struct {
+       namespace string
+       cluster   string
+       preferred string
+}
+
+var failoverOptions FailoverOptions
+
+var FailoverCommand = &cobra.Command{
+       Use:   "failover",
+       Short: "Failover the master of a shard",
+       Example: `
+# Failover the master of a shard
+kvctl failover shard <shard_index> -n <namespace> -c <cluster>
+
+# Failover the master of a shard with preferred slave
+kvctl failover shard <shard_index> --preferred <node_id> -n <namespace> -c 
<cluster>
+`,
+       PreRunE: failoverPreRun,
+       RunE: func(cmd *cobra.Command, args []string) error {
+               if len(args) < 2 {
+                       return fmt.Errorf("missing shard index, plese specify 
the shard index")
+               }
+               host, _ := cmd.Flags().GetString("host")
+               client := newClient(host)
+               resource := args[0]
+               switch resource {
+               case "shard":
+                       shardIndex, err := strconv.Atoi(args[1])
+                       if err != nil {
+                               return fmt.Errorf("invalid shard index: %s", 
args[1])
+                       }
+                       return failoverShard(client, &failoverOptions, 
shardIndex)
+               default:
+                       return fmt.Errorf("unsupported resource type: %s", 
resource)
+               }
+       },
+}
+
+func failoverPreRun(cmd *cobra.Command, args []string) error {
+       if len(args) < 1 {
+               return fmt.Errorf("missing resource name, must be [shard]")
+       }
+       if failoverOptions.namespace == "" {
+               return fmt.Errorf("namespace is required")
+       }
+       if failoverOptions.cluster == "" {
+               return fmt.Errorf("cluster is required")
+       }
+       return nil
+}
+
+func failoverShard(client *client, options *FailoverOptions, shardIndex int) 
error {
+       rsp, err := client.restyCli.R().
+               SetPathParam("namespace", options.namespace).
+               SetPathParam("cluster", options.cluster).
+               SetPathParam("shard", strconv.Itoa(shardIndex)).
+               
Post("/namespaces/{namespace}/clusters/{cluster}/shards/{shard}/failover")
+       if err != nil {
+               return err
+       }
+       if rsp.IsError() {
+               return unmarshalError(rsp.Body())
+       }
+       var result struct {
+               NewMasterID string `json:"new_master_id"`
+       }
+       if err := unmarshalData(rsp.Body(), &result); err != nil {
+               return err
+       }
+       printLine("failover shard %d successfully, new master id: %s.", 
shardIndex, result.NewMasterID)
+       return nil
+}
+
+func init() {
+       FailoverCommand.Flags().StringVarP(&failoverOptions.namespace, 
"namespace", "n", "", "The namespace of the cluster")
+       FailoverCommand.Flags().StringVarP(&failoverOptions.cluster, "cluster", 
"c", "", "The name of the cluster")
+       FailoverCommand.Flags().StringVarP(&failoverOptions.preferred, 
"preferred", "", "", "The preferred slave node id")
+}
diff --git a/cmd/client/main.go b/cmd/client/main.go
index 93e2251..214cd65 100644
--- a/cmd/client/main.go
+++ b/cmd/client/main.go
@@ -58,6 +58,7 @@ func init() {
        rootCommand.AddCommand(command.DeleteCommand)
        rootCommand.AddCommand(command.ImportCommand)
        rootCommand.AddCommand(command.MigrateCommand)
+       rootCommand.AddCommand(command.FailoverCommand)
 
        rootCommand.SilenceUsage = true
        rootCommand.SilenceErrors = true
diff --git a/server/api/shard.go b/server/api/shard.go
index 9ccf258..ff673c8 100644
--- a/server/api/shard.go
+++ b/server/api/shard.go
@@ -21,6 +21,7 @@ package api
 
 import (
        "errors"
+       "io"
        "strconv"
 
        "github.com/gin-gonic/gin"
@@ -116,9 +117,26 @@ func (handler *ShardHandler) Remove(c *gin.Context) {
 func (handler *ShardHandler) Failover(c *gin.Context) {
        ns := c.Param("namespace")
        cluster, _ := c.MustGet(consts.ContextKeyCluster).(*store.Cluster)
+
+       var req struct {
+               PreferredNodeID string `json:"preferred_node_id"`
+       }
+       if c.Request.Body != nil {
+               body, err := io.ReadAll(c.Request.Body)
+               if err != nil {
+                       helper.ResponseBadRequest(c, err)
+                       return
+               }
+               if len(body) > 0 {
+                       if err := c.BindJSON(&req); err != nil {
+                               helper.ResponseBadRequest(c, err)
+                               return
+                       }
+               }
+       }
        // We have checked this if statement in middleware.RequiredClusterShard
        shardIndex, _ := strconv.Atoi(c.Param("shard"))
-       newMasterNodeID, err := cluster.PromoteNewMaster(c, shardIndex, "", "")
+       newMasterNodeID, err := cluster.PromoteNewMaster(c, shardIndex, "", 
req.PreferredNodeID)
        if err != nil {
                helper.ResponseError(c, err)
                return

Reply via email to