This is an automated email from the ASF dual-hosted git repository.

zhaoliwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new e147550  feat(hotspot): add a function to start hotkey detecting in 
shell commands  (#605)
e147550 is described below

commit e147550acaf74ff30e46994204689473f9f36b0d
Author: Smilencer <[email protected]>
AuthorDate: Tue Sep 29 03:08:05 2020 -0500

    feat(hotspot): add a function to start hotkey detecting in shell commands  
(#605)
---
 rdsn                                  |   2 +-
 src/shell/command_utils.h             |  36 ++++++++++
 src/shell/commands.h                  |   5 +-
 src/shell/commands/detect_hotkey.cpp  | 126 ++++++++++++++++++++++++++++++++++
 src/shell/commands/disk_rebalance.cpp |  32 ---------
 src/shell/main.cpp                    |  10 +++
 6 files changed, 177 insertions(+), 34 deletions(-)

diff --git a/rdsn b/rdsn
index fafbd59..69102a7 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit fafbd599d1df48b36bade4d08940ab79837aaf3b
+Subproject commit 69102a786f3b888155bc18b8b6c58031c7d2fd98
diff --git a/src/shell/command_utils.h b/src/shell/command_utils.h
index ea534a6..2aa480b 100644
--- a/src/shell/command_utils.h
+++ b/src/shell/command_utils.h
@@ -5,6 +5,42 @@
 #pragma once
 
 #include <map>
+#include <string>
+#include <set>
+
+#include "shell/argh.h"
+#include <dsn/dist/fmt_logging.h>
+
+inline bool validate_cmd(const argh::parser &cmd,
+                         const std::set<std::string> &params,
+                         const std::set<std::string> &flags)
+{
+    if (cmd.size() > 1) {
+        fmt::print(stderr, "too many params!\n");
+        return false;
+    }
+
+    for (const auto &param : cmd.params()) {
+        if (params.find(param.first) == params.end()) {
+            fmt::print(stderr, "unknown param {} = {}\n", param.first, 
param.second);
+            return false;
+        }
+    }
+
+    for (const auto &flag : cmd.flags()) {
+        if (params.find(flag) != params.end()) {
+            fmt::print(stderr, "missing value of {}\n", flag);
+            return false;
+        }
+
+        if (flags.find(flag) == flags.end()) {
+            fmt::print(stderr, "unknown flag {}\n", flag);
+            return false;
+        }
+    }
+
+    return true;
+}
 
 #define verify_logged(exp, ...)                                                
                    \
     do {                                                                       
                    \
diff --git a/src/shell/commands.h b/src/shell/commands.h
index 27618f0..af29c2c 100644
--- a/src/shell/commands.h
+++ b/src/shell/commands.h
@@ -25,7 +25,6 @@
 #include <pegasus/error.h>
 
 #include "command_executor.h"
-#include "command_utils.h"
 #include "command_helper.h"
 #include "args.h"
 
@@ -262,3 +261,7 @@ bool pause_bulk_load(command_executor *e, shell_context 
*sc, arguments args);
 bool restart_bulk_load(command_executor *e, shell_context *sc, arguments args);
 
 bool cancel_bulk_load(command_executor *e, shell_context *sc, arguments args);
+
+// == detect hotkey (see 'commands/detect_hotkey.cpp') == //
+
+bool detect_hotkey(command_executor *e, shell_context *sc, arguments args);
diff --git a/src/shell/commands/detect_hotkey.cpp 
b/src/shell/commands/detect_hotkey.cpp
new file mode 100644
index 0000000..900e70f
--- /dev/null
+++ b/src/shell/commands/detect_hotkey.cpp
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "shell/commands.h"
+#include "shell/argh.h"
+#include <dsn/dist/replication/replication_types.h>
+
+bool generate_hotkey_request(dsn::replication::detect_hotkey_request &req,
+                             const std::string &hotkey_action,
+                             const std::string &hotkey_type,
+                             int app_id,
+                             int partition_index,
+                             std::string &err_info)
+{
+    if (!strcasecmp(hotkey_type.c_str(), "read")) {
+        req.type = dsn::replication::hotkey_type::type::READ;
+    } else if (!strcasecmp(hotkey_type.c_str(), "write")) {
+        req.type = dsn::replication::hotkey_type::type::WRITE;
+    } else {
+        err_info = fmt::format("\"{}\" is an invalid hotkey type (should be 
'read' or 'write')\n",
+                               hotkey_type);
+        return false;
+    }
+
+    if (!strcasecmp(hotkey_action.c_str(), "start")) {
+        req.action = dsn::replication::detect_action::START;
+    } else if (!strcasecmp(hotkey_action.c_str(), "stop")) {
+        req.action = dsn::replication::detect_action::STOP;
+    } else {
+        err_info =
+            fmt::format("\"{}\" is an invalid hotkey detect action (should be 
'start' or 'stop')\n",
+                        hotkey_action);
+        return false;
+    }
+    req.pid = dsn::gpid(app_id, partition_index);
+    return true;
+}
+
+// TODO: (Tangyanzhao) merge 
hotspot_partition_calculator::send_detect_hotkey_request
+bool detect_hotkey(command_executor *e, shell_context *sc, arguments args)
+{
+    // detect_hotkey
+    // <-a|--app_id str><-p|--partition_index num><-t|--hotkey_type read|write>
+    // <-c|--detect_action start|stop><-d|--address str>
+    const std::set<std::string> params = {"a",
+                                          "app_id",
+                                          "p",
+                                          "partition_index",
+                                          "c",
+                                          "hotkey_action",
+                                          "t",
+                                          "hotkey_type",
+                                          "d",
+                                          "address"};
+    const std::set<std::string> flags = {};
+    argh::parser cmd(args.argc, args.argv, 
argh::parser::PREFER_PARAM_FOR_UNREG_OPTION);
+    if (!validate_cmd(cmd, params, flags)) {
+        return false;
+    }
+
+    int app_id;
+    if (!dsn::buf2int32(cmd({"-a", "--app_id"}).str(), app_id)) {
+        fmt::print(stderr, "\"{}\" is an invalid num\n", cmd({"-a", 
"--app_id"}).str());
+        return false;
+    }
+
+    int partition_index;
+    if (!dsn::buf2int32(cmd({"-p", "--partition_index"}).str(), 
partition_index)) {
+        fmt::print(stderr, "\"{}\" is an invalid num\n", cmd({"-p", 
"--partition_index"}).str());
+        return false;
+    }
+
+    dsn::rpc_address target_address;
+    std::string ip_str = cmd({"-d", "--address"}).str();
+    if (!target_address.from_string_ipv4(ip_str.c_str())) {
+        fmt::print("invalid ip, error={}\n", ip_str);
+        return false;
+    }
+
+    std::string err_info;
+    std::string hotkey_action = cmd({"-c", "--hotkey_action"}).str();
+    std::string hotkey_type = cmd({"-t", "--hotkey_type"}).str();
+    dsn::replication::detect_hotkey_request req;
+    if (!generate_hotkey_request(
+            req, hotkey_action, hotkey_type, app_id, partition_index, 
err_info)) {
+        fmt::print(stderr, err_info);
+        return false;
+    }
+
+    detect_hotkey_response resp;
+    auto err = sc->ddl_client->detect_hotkey(dsn::rpc_address(target_address), 
req, resp);
+    if (err != dsn::ERR_OK) {
+        fmt::print(stderr,
+                   "Hotkey detect rpc sending failed, in {}.{}, 
error_hint:{}\n",
+                   app_id,
+                   partition_index,
+                   err.to_string());
+        return false;
+    }
+
+    if (resp.err != dsn::ERR_OK) {
+        fmt::print(stderr,
+                   "Hotkey detect rpc performed failed, in {}.{}, 
error_hint:{} {}\n",
+                   app_id,
+                   partition_index,
+                   resp.err,
+                   resp.err_hint);
+        return false;
+    }
+
+    return true;
+}
diff --git a/src/shell/commands/disk_rebalance.cpp 
b/src/shell/commands/disk_rebalance.cpp
index 97462c9..6ced3dd 100644
--- a/src/shell/commands/disk_rebalance.cpp
+++ b/src/shell/commands/disk_rebalance.cpp
@@ -10,40 +10,8 @@
 #include <fmt/ostream.h>
 #include <dsn/utility/errors.h>
 #include <dsn/utility/output_utils.h>
-#include <dsn/utility/string_conv.h>
 #include <dsn/dist/replication/duplication_common.h>
 
-bool validate_cmd(const argh::parser &cmd,
-                  const std::set<std::string> &params,
-                  const std::set<std::string> &flags)
-{
-    if (cmd.size() > 1) {
-        fmt::print(stderr, "too many params!\n");
-        return false;
-    }
-
-    for (const auto &param : cmd.params()) {
-        if (params.find(param.first) == params.end()) {
-            fmt::print(stderr, "unknown param {} = {}\n", param.first, 
param.second);
-            return false;
-        }
-    }
-
-    for (const auto &flag : cmd.flags()) {
-        if (params.find(flag) != params.end()) {
-            fmt::print(stderr, "missing value of {}\n", flag);
-            return false;
-        }
-
-        if (flags.find(flag) == flags.end()) {
-            fmt::print(stderr, "unknown flag {}\n", flag);
-            return false;
-        }
-    }
-
-    return true;
-}
-
 bool query_disk_info(
     shell_context *sc,
     const argh::parser &cmd,
diff --git a/src/shell/main.cpp b/src/shell/main.cpp
index ea2688b..c747556 100644
--- a/src/shell/main.cpp
+++ b/src/shell/main.cpp
@@ -475,6 +475,16 @@ static command_executor commands[] = {
         cancel_bulk_load,
     },
     {
+        "detect_hotkey",
+        "start or stop hotkey detection on a replica of a replica server",
+        "<-a|--app_id num> "
+        "<-p|--partition_index num> "
+        "<-t|--hotkey_type read|write> "
+        "<-c|--detect_action start|stop> "
+        "<-d|--address str>",
+        detect_hotkey,
+    },
+    {
         "exit", "exit shell", "", exit_shell,
     },
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to