This is an automated email from the ASF dual-hosted git repository.

yuchenhe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b96b26  feat(hotkey): build a fundamental framework of hotkey 
detection (#603)
3b96b26 is described below

commit 3b96b264f48906500ac276f4f9194adf6867837a
Author: Smilencer <[email protected]>
AuthorDate: Mon Oct 12 15:59:09 2020 +0800

    feat(hotkey): build a fundamental framework of hotkey detection (#603)
---
 rdsn                                    |  2 +-
 src/server/hotkey_collector.cpp         | 37 ++++++++++++++++
 src/server/hotkey_collector.h           | 75 +++++++++++++++++++++++++++++++++
 src/server/pegasus_server_impl.cpp      | 24 +++++++++++
 src/server/pegasus_server_impl.h        |  7 +++
 src/server/pegasus_server_impl_init.cpp |  4 ++
 src/server/test/CMakeLists.txt          |  1 +
 7 files changed, 149 insertions(+), 1 deletion(-)

diff --git a/rdsn b/rdsn
index 69102a7..ccecd59 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit 69102a786f3b888155bc18b8b6c58031c7d2fd98
+Subproject commit ccecd5996a7b5c5b3281860186b30048a2ee4763
diff --git a/src/server/hotkey_collector.cpp b/src/server/hotkey_collector.cpp
new file mode 100644
index 0000000..c05e33c
--- /dev/null
+++ b/src/server/hotkey_collector.cpp
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "hotkey_collector.h"
+
+namespace pegasus {
+namespace server {
+
+// TODO: (Tangyanzhao) implement these functions
+void hotkey_collector::handle_rpc(const 
dsn::replication::detect_hotkey_request &req,
+                                  dsn::replication::detect_hotkey_response 
&resp)
+{
+}
+
+void hotkey_collector::capture_raw_key(const dsn::blob &raw_key, int64_t 
weight)
+{
+    // TODO: (Tangyanzhao) Add a judgment sentence to check if it is a raw key
+}
+
+void hotkey_collector::capture_hash_key(const dsn::blob &hash_key, int64_t 
weight) {}
+
+} // namespace server
+} // namespace pegasus
diff --git a/src/server/hotkey_collector.h b/src/server/hotkey_collector.h
new file mode 100644
index 0000000..57c7482
--- /dev/null
+++ b/src/server/hotkey_collector.h
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <dsn/utility/string_view.h>
+#include <dsn/dist/replication/replication_types.h>
+
+namespace pegasus {
+namespace server {
+
+//    hotkey_collector is responsible to find the hot keys after the partition
+//    was detected to be hot. The two types of hotkey, READ & WRITE, are 
detected
+//    separately.
+//
+//    +--------------------+  
+----------------------------------------------------+
+//    |   Replcia server   |  | Hotkey collector                               
    |
+//    |                    |  | 
+-----------------------------------------------+  |
+//    | +----------------+ |  | | Corase capture                               
 |  |
+//    | |                | |--> |                 +----------+                 
 |  |
+//    | |  RPC received  | || | |                 |   Data   |                 
 |  |
+//    | |                | || | |                 +-----+----+                 
 |  |
+//    | +-------+--------+ || | |                       |                      
 |  |
+//    |         |          || | |  +---------------+----v--+-------+---------+ 
 |  |
+//    |         v          || | |  |       |Hot    |       |       |         | 
 |  |
+//    | +-------+--------+ || | |  |Bucket |Bucket |Bucket |Bucket |Bucket   | 
 |  |
+//    | |   Replication  | || | |  +-----------+-----------------------------+ 
 |  |
+//    | | (only on the   | || | |              |                               
 |  |
+//    | |  write path))  | || | 
+--------------|--------------------------------+  |
+//    | +-------+--------+ || |             +--v---+                           
    |
+//    |         |          || |             | Data |                           
    |
+//    |         v          || |             +------+                           
    |
+//    | +-------+--------+ || |          +-----|-------+-------------+         
    |
+//    | |                | || |   
+------|-------------|-------------|---------+   |
+//    | |  Capture data  ---| |   | Fine |capture      |             |         
|   |
+//    | |                | |  |   |      |             |             |         
|   |
+//    | +-------+--------+ |  |   | +----v----+   +----v----+   +----v----+    
|   |
+//    |         |          |  |   | |  queue  |   |  queue  |   |  queue  |    
|   |
+//    |         v          |  |   | +----+----+   +----+----+   +----+----+    
|   |
+//    | +-------+--------+ |  |   |      |             |             |         
|   |
+//    | |                | |  |   | +----v-------------v-------------v------+  
|   |
+//    | |   Place data   | |  |   | |             Analsis pool              |  
|   |
+//    | |   to the disk  | |  |   | +-----------------|---------------------+  
|   |
+//    | |                | |  |   
+-------------------|------------------------+   |
+//    | +----------------+ |  |                       v                        
    |
+//    |                    |  |                     Hotkey                     
    |
+//    +--------------------+  
+----------------------------------------------------+
+
+class hotkey_collector
+{
+public:
+    // TODO: (Tangyanzhao) capture_*_key should be consistent with hotspot 
detection
+    // weight: calculate the weight according to the specific situation
+    void capture_raw_key(const dsn::blob &raw_key, int64_t weight);
+    void capture_hash_key(const dsn::blob &hash_key, int64_t weight);
+    void handle_rpc(const dsn::replication::detect_hotkey_request &req,
+                    /*out*/ dsn::replication::detect_hotkey_response &resp);
+};
+
+} // namespace server
+} // namespace pegasus
diff --git a/src/server/pegasus_server_impl.cpp 
b/src/server/pegasus_server_impl.cpp
index 577c907..122c638 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -22,6 +22,7 @@
 #include "capacity_unit_calculator.h"
 #include "pegasus_server_write.h"
 #include "meta_store.h"
+#include "hotkey_collector.h"
 
 using namespace dsn::literals::chrono_literals;
 
@@ -2798,5 +2799,28 @@ void 
pegasus_server_impl::set_ingestion_status(dsn::replication::ingestion_statu
     _ingestion_status = status;
 }
 
+void pegasus_server_impl::on_detect_hotkey(const 
dsn::replication::detect_hotkey_request &req,
+                                           
dsn::replication::detect_hotkey_response &resp)
+{
+
+    if (dsn_unlikely(req.action != dsn::replication::detect_action::START &&
+                     req.action != dsn::replication::detect_action::STOP)) {
+        resp.err = dsn::ERR_INVALID_PARAMETERS;
+        resp.__set_err_hint("invalid detect_action");
+        return;
+    }
+
+    if (dsn_unlikely(req.type != dsn::replication::hotkey_type::READ &&
+                     req.type != dsn::replication::hotkey_type::WRITE)) {
+        resp.err = dsn::ERR_INVALID_PARAMETERS;
+        resp.__set_err_hint("invalid hotkey_type");
+        return;
+    }
+
+    auto collector = req.type == dsn::replication::hotkey_type::READ ? 
_read_hotkey_collector
+                                                                     : 
_write_hotkey_collector;
+    collector->handle_rpc(req, resp);
+}
+
 } // namespace server
 } // namespace pegasus
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index 1db2077..15cf80a 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -28,6 +28,7 @@ namespace server {
 class meta_store;
 class capacity_unit_calculator;
 class pegasus_server_write;
+class hotkey_collector;
 
 class pegasus_server_impl : public pegasus_read_service
 {
@@ -318,6 +319,9 @@ private:
 
     ::dsn::error_code flush_all_family_columns(bool wait);
 
+    void on_detect_hotkey(const dsn::replication::detect_hotkey_request &req,
+                          dsn::replication::detect_hotkey_response &resp) 
override;
+
 private:
     static const std::chrono::seconds kServerStatUpdateTimeSec;
     static const std::string COMPRESSION_HEADER;
@@ -382,6 +386,9 @@ private:
 
     dsn::task_tracker _tracker;
 
+    std::shared_ptr<hotkey_collector> _read_hotkey_collector;
+    std::shared_ptr<hotkey_collector> _write_hotkey_collector;
+
     // perf counters
     ::dsn::perf_counter_wrapper _pfc_get_qps;
     ::dsn::perf_counter_wrapper _pfc_multi_get_qps;
diff --git a/src/server/pegasus_server_impl_init.cpp 
b/src/server/pegasus_server_impl_init.cpp
index b97d66c..71c6979 100644
--- a/src/server/pegasus_server_impl_init.cpp
+++ b/src/server/pegasus_server_impl_init.cpp
@@ -12,6 +12,7 @@
 #include "meta_store.h"
 #include "pegasus_event_listener.h"
 #include "pegasus_server_write.h"
+#include "hotkey_collector.h"
 
 namespace pegasus {
 namespace server {
@@ -42,6 +43,9 @@ 
pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
     _primary_address = dsn::rpc_address(dsn_primary_address()).to_string();
     _gpid = get_gpid();
 
+    _read_hotkey_collector = std::make_shared<hotkey_collector>();
+    _write_hotkey_collector = std::make_shared<hotkey_collector>();
+
     _verbose_log = dsn_config_get_value_bool("pegasus.server",
                                              "rocksdb_verbose_log",
                                              false,
diff --git a/src/server/test/CMakeLists.txt b/src/server/test/CMakeLists.txt
index 0184dbf..20f290b 100644
--- a/src/server/test/CMakeLists.txt
+++ b/src/server/test/CMakeLists.txt
@@ -10,6 +10,7 @@ set(MY_PROJ_SRC "../pegasus_server_impl.cpp"
                 "../pegasus_mutation_duplicator.cpp"
                 "../hotspot_partition_calculator.cpp"
                 "../meta_store.cpp"
+                "../hotkey_collector.cpp"
 )
 
 set(MY_SRC_SEARCH_MODE "GLOB")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to