This is an automated email from the ASF dual-hosted git repository.

wutao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 412492d  feat(hotspot): add a function to start hotkey detecting in 
hotspot_partition_calculator (#601)
412492d is described below

commit 412492d3cd879e28b842c93adedb3923a79168c2
Author: Smilencer <[email protected]>
AuthorDate: Wed Sep 16 23:32:49 2020 +0800

    feat(hotspot): add a function to start hotkey detecting in 
hotspot_partition_calculator (#601)
---
 src/base/rrdb_types.cpp                     | 24 ++++++-------
 src/idl/rrdb.thrift                         |  2 +-
 src/include/rrdb/rrdb_types.h               | 14 ++++----
 src/server/hotspot_partition_calculator.cpp | 54 +++++++++++++++++++++++++++++
 src/server/hotspot_partition_calculator.h   |  4 +++
 src/server/pegasus_read_service.h           |  2 ++
 6 files changed, 79 insertions(+), 21 deletions(-)

diff --git a/src/base/rrdb_types.cpp b/src/base/rrdb_types.cpp
index c940a2b..53ac7c7 100644
--- a/src/base/rrdb_types.cpp
+++ b/src/base/rrdb_types.cpp
@@ -4608,9 +4608,9 @@ hotkey_detect_request::~hotkey_detect_request() throw() {}
 
 void hotkey_detect_request::__set_type(const hotkey_type::type val) { 
this->type = val; }
 
-void hotkey_detect_request::__set_operation(const hotkey_detect_action::type 
val)
+void hotkey_detect_request::__set_action(const hotkey_detect_action::type val)
 {
-    this->operation = val;
+    this->action = val;
 }
 
 uint32_t hotkey_detect_request::read(::apache::thrift::protocol::TProtocol 
*iprot)
@@ -4646,8 +4646,8 @@ uint32_t 
hotkey_detect_request::read(::apache::thrift::protocol::TProtocol *ipro
             if (ftype == ::apache::thrift::protocol::T_I32) {
                 int32_t ecast135;
                 xfer += iprot->readI32(ecast135);
-                this->operation = (hotkey_detect_action::type)ecast135;
-                this->__isset.operation = true;
+                this->action = (hotkey_detect_action::type)ecast135;
+                this->__isset.action = true;
             } else {
                 xfer += iprot->skip(ftype);
             }
@@ -4674,8 +4674,8 @@ uint32_t 
hotkey_detect_request::write(::apache::thrift::protocol::TProtocol *opr
     xfer += oprot->writeI32((int32_t)this->type);
     xfer += oprot->writeFieldEnd();
 
-    xfer += oprot->writeFieldBegin("operation", 
::apache::thrift::protocol::T_I32, 2);
-    xfer += oprot->writeI32((int32_t)this->operation);
+    xfer += oprot->writeFieldBegin("action", 
::apache::thrift::protocol::T_I32, 2);
+    xfer += oprot->writeI32((int32_t)this->action);
     xfer += oprot->writeFieldEnd();
 
     xfer += oprot->writeFieldStop();
@@ -4687,33 +4687,33 @@ void swap(hotkey_detect_request &a, 
hotkey_detect_request &b)
 {
     using ::std::swap;
     swap(a.type, b.type);
-    swap(a.operation, b.operation);
+    swap(a.action, b.action);
     swap(a.__isset, b.__isset);
 }
 
 hotkey_detect_request::hotkey_detect_request(const hotkey_detect_request 
&other136)
 {
     type = other136.type;
-    operation = other136.operation;
+    action = other136.action;
     __isset = other136.__isset;
 }
 hotkey_detect_request::hotkey_detect_request(hotkey_detect_request &&other137)
 {
     type = std::move(other137.type);
-    operation = std::move(other137.operation);
+    action = std::move(other137.action);
     __isset = std::move(other137.__isset);
 }
 hotkey_detect_request &hotkey_detect_request::operator=(const 
hotkey_detect_request &other138)
 {
     type = other138.type;
-    operation = other138.operation;
+    action = other138.action;
     __isset = other138.__isset;
     return *this;
 }
 hotkey_detect_request &hotkey_detect_request::operator=(hotkey_detect_request 
&&other139)
 {
     type = std::move(other139.type);
-    operation = std::move(other139.operation);
+    action = std::move(other139.action);
     __isset = std::move(other139.__isset);
     return *this;
 }
@@ -4723,7 +4723,7 @@ void hotkey_detect_request::printTo(std::ostream &out) 
const
     out << "hotkey_detect_request(";
     out << "type=" << to_string(type);
     out << ", "
-        << "operation=" << to_string(operation);
+        << "action=" << to_string(action);
     out << ")";
 }
 
diff --git a/src/idl/rrdb.thrift b/src/idl/rrdb.thrift
index 9b874df..fdf05f8 100644
--- a/src/idl/rrdb.thrift
+++ b/src/idl/rrdb.thrift
@@ -293,7 +293,7 @@ struct duplicate_response
 
 struct hotkey_detect_request {
     1: hotkey_type type
-    2: hotkey_detect_action operation
+    2: hotkey_detect_action action
 }
 
 struct hotkey_detect_response {
diff --git a/src/include/rrdb/rrdb_types.h b/src/include/rrdb/rrdb_types.h
index 2c6a210..14683ac 100644
--- a/src/include/rrdb/rrdb_types.h
+++ b/src/include/rrdb/rrdb_types.h
@@ -1967,9 +1967,9 @@ inline std::ostream &operator<<(std::ostream &out, const 
duplicate_response &obj
 
 typedef struct _hotkey_detect_request__isset
 {
-    _hotkey_detect_request__isset() : type(false), operation(false) {}
+    _hotkey_detect_request__isset() : type(false), action(false) {}
     bool type : 1;
-    bool operation : 1;
+    bool action : 1;
 } _hotkey_detect_request__isset;
 
 class hotkey_detect_request
@@ -1979,25 +1979,23 @@ public:
     hotkey_detect_request(hotkey_detect_request &&);
     hotkey_detect_request &operator=(const hotkey_detect_request &);
     hotkey_detect_request &operator=(hotkey_detect_request &&);
-    hotkey_detect_request() : type((hotkey_type::type)0), 
operation((hotkey_detect_action::type)0)
-    {
-    }
+    hotkey_detect_request() : type((hotkey_type::type)0), 
action((hotkey_detect_action::type)0) {}
 
     virtual ~hotkey_detect_request() throw();
     hotkey_type::type type;
-    hotkey_detect_action::type operation;
+    hotkey_detect_action::type action;
 
     _hotkey_detect_request__isset __isset;
 
     void __set_type(const hotkey_type::type val);
 
-    void __set_operation(const hotkey_detect_action::type val);
+    void __set_action(const hotkey_detect_action::type val);
 
     bool operator==(const hotkey_detect_request &rhs) const
     {
         if (!(type == rhs.type))
             return false;
-        if (!(operation == rhs.operation))
+        if (!(action == rhs.action))
             return false;
         return true;
     }
diff --git a/src/server/hotspot_partition_calculator.cpp 
b/src/server/hotspot_partition_calculator.cpp
index 865814e..839092d 100644
--- a/src/server/hotspot_partition_calculator.cpp
+++ b/src/server/hotspot_partition_calculator.cpp
@@ -21,6 +21,13 @@
 #include <math.h>
 #include <dsn/dist/fmt_logging.h>
 #include <dsn/utility/flags.h>
+#include <dsn/tool-api/rpc_address.h>
+#include <dsn/tool-api/group_address.h>
+#include <dsn/utility/error_code.h>
+#include <rrdb/rrdb_types.h>
+#include <dsn/dist/replication/duplication_common.h>
+#include <dsn/tool-api/task_tracker.h>
+#include "pegasus_read_service.h"
 
 namespace pegasus {
 namespace server {
@@ -98,5 +105,52 @@ void hotspot_partition_calculator::data_analyse()
     }
 }
 
+// TODO:(TangYanzhao) call this function to start hotkey detection
+/*static*/ void hotspot_partition_calculator::send_hotkey_detect_request(
+    const std::string &app_name,
+    const uint64_t partition_index,
+    const dsn::apps::hotkey_type::type hotkey_type,
+    const dsn::apps::hotkey_detect_action::type action)
+{
+    auto request = std::make_unique<dsn::apps::hotkey_detect_request>();
+    request->type = hotkey_type;
+    request->action = action;
+    ddebug_f("{} {} hotkey detection in {}.{}",
+             (action == dsn::apps::hotkey_detect_action::STOP) ? "Stop" : 
"Start",
+             (hotkey_type == dsn::apps::hotkey_type::WRITE) ? "write" : "read",
+             app_name,
+             partition_index);
+    dsn::rpc_address meta_server;
+    meta_server.assign_group("meta-servers");
+    std::vector<dsn::rpc_address> meta_servers;
+    replica_helper::load_meta_servers(meta_servers);
+    for (const auto &address : meta_servers) {
+        meta_server.group_address()->add(address);
+    }
+    auto cluster_name = dsn::replication::get_current_cluster_name();
+    auto resolver = partition_resolver::get_resolver(cluster_name, 
meta_servers, app_name.c_str());
+    dsn::task_tracker tracker;
+    detect_hotkey_rpc rpc(
+        std::move(request), RPC_DETECT_HOTKEY, std::chrono::seconds(10), 
partition_index);
+    rpc.call(resolver,
+             &tracker,
+             [app_name, partition_index](dsn::error_code error) {
+                 if (error != dsn::ERR_OK) {
+                     derror_f("Hotkey detect rpc sending failed, in {}.{}, 
error_hint:{}",
+                              app_name,
+                              partition_index,
+                              error.to_string());
+                 }
+             })
+        ->wait();
+    if (rpc.response().err != dsn::ERR_OK) {
+        derror_f("Hotkey detect rpc sending failed, in {}.{}, error_hint:{} 
{}",
+                 app_name,
+                 partition_index,
+                 rpc.response().err,
+                 rpc.response().err_hint);
+    }
+}
+
 } // namespace server
 } // namespace pegasus
diff --git a/src/server/hotspot_partition_calculator.h 
b/src/server/hotspot_partition_calculator.h
index c950ebe..8b13718 100644
--- a/src/server/hotspot_partition_calculator.h
+++ b/src/server/hotspot_partition_calculator.h
@@ -37,6 +37,10 @@ public:
     void data_aggregate(const std::vector<row_data> &partitions);
     // analyse the saved data to find hotspot partition
     void data_analyse();
+    static void send_hotkey_detect_request(const std::string &app_name,
+                                           const uint64_t partition_index,
+                                           const dsn::apps::hotkey_type::type 
hotkey_type,
+                                           const 
dsn::apps::hotkey_detect_action::type action);
 
 private:
     const std::string _app_name;
diff --git a/src/server/pegasus_read_service.h 
b/src/server/pegasus_read_service.h
index 67b6a80..fe66637 100644
--- a/src/server/pegasus_read_service.h
+++ b/src/server/pegasus_read_service.h
@@ -32,6 +32,8 @@ typedef ::dsn::rpc_holder<::dsn::blob, 
dsn::apps::ttl_response> ttl_rpc;
 typedef ::dsn::rpc_holder<::dsn::apps::get_scanner_request, 
dsn::apps::scan_response>
     get_scanner_rpc;
 typedef ::dsn::rpc_holder<::dsn::apps::scan_request, dsn::apps::scan_response> 
scan_rpc;
+typedef ::dsn::rpc_holder<::dsn::apps::hotkey_detect_request, 
dsn::apps::hotkey_detect_response>
+    detect_hotkey_rpc;
 
 class pegasus_read_service : public dsn::replication::replication_app_base,
                              public 
dsn::replication::storage_serverlet<pegasus_read_service>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to