This is an automated email from the ASF dual-hosted git repository.
yuchenhe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 3b96b26 feat(hotkey): build a fundamental framework of hotkey
detection (#603)
3b96b26 is described below
commit 3b96b264f48906500ac276f4f9194adf6867837a
Author: Smilencer <[email protected]>
AuthorDate: Mon Oct 12 15:59:09 2020 +0800
feat(hotkey): build a fundamental framework of hotkey detection (#603)
---
rdsn | 2 +-
src/server/hotkey_collector.cpp | 37 ++++++++++++++++
src/server/hotkey_collector.h | 75 +++++++++++++++++++++++++++++++++
src/server/pegasus_server_impl.cpp | 24 +++++++++++
src/server/pegasus_server_impl.h | 7 +++
src/server/pegasus_server_impl_init.cpp | 4 ++
src/server/test/CMakeLists.txt | 1 +
7 files changed, 149 insertions(+), 1 deletion(-)
diff --git a/rdsn b/rdsn
index 69102a7..ccecd59 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit 69102a786f3b888155bc18b8b6c58031c7d2fd98
+Subproject commit ccecd5996a7b5c5b3281860186b30048a2ee4763
diff --git a/src/server/hotkey_collector.cpp b/src/server/hotkey_collector.cpp
new file mode 100644
index 0000000..c05e33c
--- /dev/null
+++ b/src/server/hotkey_collector.cpp
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "hotkey_collector.h"
+
+namespace pegasus {
+namespace server {
+
+// TODO: (Tangyanzhao) implement these functions
+void hotkey_collector::handle_rpc(const
dsn::replication::detect_hotkey_request &req,
+ dsn::replication::detect_hotkey_response
&resp)
+{
+}
+
+void hotkey_collector::capture_raw_key(const dsn::blob &raw_key, int64_t
weight)
+{
+ // TODO: (Tangyanzhao) Add a judgment sentence to check if it is a raw key
+}
+
+void hotkey_collector::capture_hash_key(const dsn::blob &hash_key, int64_t
weight) {}
+
+} // namespace server
+} // namespace pegasus
diff --git a/src/server/hotkey_collector.h b/src/server/hotkey_collector.h
new file mode 100644
index 0000000..57c7482
--- /dev/null
+++ b/src/server/hotkey_collector.h
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <dsn/utility/string_view.h>
+#include <dsn/dist/replication/replication_types.h>
+
+namespace pegasus {
+namespace server {
+
+// hotkey_collector is responsible to find the hot keys after the partition
+// was detected to be hot. The two types of hotkey, READ & WRITE, are
detected
+// separately.
+//
+// +--------------------+
+----------------------------------------------------+
+// | Replcia server | | Hotkey collector
|
+// | | |
+-----------------------------------------------+ |
+// | +----------------+ | | | Corase capture
| |
+// | | | |--> | +----------+
| |
+// | | RPC received | || | | | Data |
| |
+// | | | || | | +-----+----+
| |
+// | +-------+--------+ || | | |
| |
+// | | || | | +---------------+----v--+-------+---------+
| |
+// | v || | | | |Hot | | | |
| |
+// | +-------+--------+ || | | |Bucket |Bucket |Bucket |Bucket |Bucket |
| |
+// | | Replication | || | | +-----------+-----------------------------+
| |
+// | | (only on the | || | | |
| |
+// | | write path)) | || |
+--------------|--------------------------------+ |
+// | +-------+--------+ || | +--v---+
|
+// | | || | | Data |
|
+// | v || | +------+
|
+// | +-------+--------+ || | +-----|-------+-------------+
|
+// | | | || |
+------|-------------|-------------|---------+ |
+// | | Capture data ---| | | Fine |capture | |
| |
+// | | | | | | | | |
| |
+// | +-------+--------+ | | | +----v----+ +----v----+ +----v----+
| |
+// | | | | | | queue | | queue | | queue |
| |
+// | v | | | +----+----+ +----+----+ +----+----+
| |
+// | +-------+--------+ | | | | | |
| |
+// | | | | | | +----v-------------v-------------v------+
| |
+// | | Place data | | | | | Analsis pool |
| |
+// | | to the disk | | | | +-----------------|---------------------+
| |
+// | | | | |
+-------------------|------------------------+ |
+// | +----------------+ | | v
|
+// | | | Hotkey
|
+// +--------------------+
+----------------------------------------------------+
+
+class hotkey_collector
+{
+public:
+ // TODO: (Tangyanzhao) capture_*_key should be consistent with hotspot
detection
+ // weight: calculate the weight according to the specific situation
+ void capture_raw_key(const dsn::blob &raw_key, int64_t weight);
+ void capture_hash_key(const dsn::blob &hash_key, int64_t weight);
+ void handle_rpc(const dsn::replication::detect_hotkey_request &req,
+ /*out*/ dsn::replication::detect_hotkey_response &resp);
+};
+
+} // namespace server
+} // namespace pegasus
diff --git a/src/server/pegasus_server_impl.cpp
b/src/server/pegasus_server_impl.cpp
index 577c907..122c638 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -22,6 +22,7 @@
#include "capacity_unit_calculator.h"
#include "pegasus_server_write.h"
#include "meta_store.h"
+#include "hotkey_collector.h"
using namespace dsn::literals::chrono_literals;
@@ -2798,5 +2799,28 @@ void
pegasus_server_impl::set_ingestion_status(dsn::replication::ingestion_statu
_ingestion_status = status;
}
+void pegasus_server_impl::on_detect_hotkey(const
dsn::replication::detect_hotkey_request &req,
+
dsn::replication::detect_hotkey_response &resp)
+{
+
+ if (dsn_unlikely(req.action != dsn::replication::detect_action::START &&
+ req.action != dsn::replication::detect_action::STOP)) {
+ resp.err = dsn::ERR_INVALID_PARAMETERS;
+ resp.__set_err_hint("invalid detect_action");
+ return;
+ }
+
+ if (dsn_unlikely(req.type != dsn::replication::hotkey_type::READ &&
+ req.type != dsn::replication::hotkey_type::WRITE)) {
+ resp.err = dsn::ERR_INVALID_PARAMETERS;
+ resp.__set_err_hint("invalid hotkey_type");
+ return;
+ }
+
+ auto collector = req.type == dsn::replication::hotkey_type::READ ?
_read_hotkey_collector
+ :
_write_hotkey_collector;
+ collector->handle_rpc(req, resp);
+}
+
} // namespace server
} // namespace pegasus
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index 1db2077..15cf80a 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -28,6 +28,7 @@ namespace server {
class meta_store;
class capacity_unit_calculator;
class pegasus_server_write;
+class hotkey_collector;
class pegasus_server_impl : public pegasus_read_service
{
@@ -318,6 +319,9 @@ private:
::dsn::error_code flush_all_family_columns(bool wait);
+ void on_detect_hotkey(const dsn::replication::detect_hotkey_request &req,
+ dsn::replication::detect_hotkey_response &resp)
override;
+
private:
static const std::chrono::seconds kServerStatUpdateTimeSec;
static const std::string COMPRESSION_HEADER;
@@ -382,6 +386,9 @@ private:
dsn::task_tracker _tracker;
+ std::shared_ptr<hotkey_collector> _read_hotkey_collector;
+ std::shared_ptr<hotkey_collector> _write_hotkey_collector;
+
// perf counters
::dsn::perf_counter_wrapper _pfc_get_qps;
::dsn::perf_counter_wrapper _pfc_multi_get_qps;
diff --git a/src/server/pegasus_server_impl_init.cpp
b/src/server/pegasus_server_impl_init.cpp
index b97d66c..71c6979 100644
--- a/src/server/pegasus_server_impl_init.cpp
+++ b/src/server/pegasus_server_impl_init.cpp
@@ -12,6 +12,7 @@
#include "meta_store.h"
#include "pegasus_event_listener.h"
#include "pegasus_server_write.h"
+#include "hotkey_collector.h"
namespace pegasus {
namespace server {
@@ -42,6 +43,9 @@
pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
_primary_address = dsn::rpc_address(dsn_primary_address()).to_string();
_gpid = get_gpid();
+ _read_hotkey_collector = std::make_shared<hotkey_collector>();
+ _write_hotkey_collector = std::make_shared<hotkey_collector>();
+
_verbose_log = dsn_config_get_value_bool("pegasus.server",
"rocksdb_verbose_log",
false,
diff --git a/src/server/test/CMakeLists.txt b/src/server/test/CMakeLists.txt
index 0184dbf..20f290b 100644
--- a/src/server/test/CMakeLists.txt
+++ b/src/server/test/CMakeLists.txt
@@ -10,6 +10,7 @@ set(MY_PROJ_SRC "../pegasus_server_impl.cpp"
"../pegasus_mutation_duplicator.cpp"
"../hotspot_partition_calculator.cpp"
"../meta_store.cpp"
+ "../hotkey_collector.cpp"
)
set(MY_SRC_SEARCH_MODE "GLOB")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]