This is an automated email from the ASF dual-hosted git repository.
smityz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 791e0fc feat: support read throttling by size (#829)
791e0fc is described below
commit 791e0fcf58d122dca075c7818aed8d15f296caeb
Author: Smilencer <[email protected]>
AuthorDate: Wed Nov 3 17:01:40 2021 +0800
feat: support read throttling by size (#829)
---
rdsn | 2 +-
src/base/pegasus_const.cpp | 2 +
src/base/pegasus_const.h | 2 +
src/server/capacity_unit_calculator.cpp | 10 +++-
src/server/capacity_unit_calculator.h | 17 +++++-
src/server/pegasus_server_impl.cpp | 72 ++++++++++++++++++++++-
src/server/pegasus_server_impl.h | 13 ++++
src/server/pegasus_server_impl_init.cpp | 8 +++
src/server/test/capacity_unit_calculator_test.cpp | 4 +-
9 files changed, 122 insertions(+), 8 deletions(-)
diff --git a/rdsn b/rdsn
index 3bcff61..fc41809 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit 3bcff61f1103d2947c2fc9b723f81b720f342c27
+Subproject commit fc41809ce1622a47a535a2316df91d4d626f35ed
diff --git a/src/base/pegasus_const.cpp b/src/base/pegasus_const.cpp
index 1a72bc6..f8127ba 100644
--- a/src/base/pegasus_const.cpp
+++ b/src/base/pegasus_const.cpp
@@ -97,4 +97,6 @@ const std::string
SPLIT_VALIDATE_PARTITION_HASH("replica.split.validate_partitio
/// json string which represents user specified compaction
const std::string USER_SPECIFIED_COMPACTION("user_specified_compaction");
+
+const std::string READ_SIZE_THROTTLING("replica.read_throttling_by_size");
} // namespace pegasus
diff --git a/src/base/pegasus_const.h b/src/base/pegasus_const.h
index c0eb973..bbe4193 100644
--- a/src/base/pegasus_const.h
+++ b/src/base/pegasus_const.h
@@ -68,4 +68,6 @@ extern const std::string ROCKSDB_BLOCK_CACHE_ENABLED;
extern const std::string SPLIT_VALIDATE_PARTITION_HASH;
extern const std::string USER_SPECIFIED_COMPACTION;
+
+extern const std::string READ_SIZE_THROTTLING;
} // namespace pegasus
diff --git a/src/server/capacity_unit_calculator.cpp
b/src/server/capacity_unit_calculator.cpp
index 22d5ff0..5f978cd 100644
--- a/src/server/capacity_unit_calculator.cpp
+++ b/src/server/capacity_unit_calculator.cpp
@@ -20,6 +20,7 @@
#include "capacity_unit_calculator.h"
#include <dsn/utility/config_api.h>
+#include <dsn/utils/token_bucket_throttling_controller.h>
#include <rocksdb/status.h>
#include "hotkey_collector.h"
@@ -29,13 +30,17 @@ namespace server {
capacity_unit_calculator::capacity_unit_calculator(
replica_base *r,
std::shared_ptr<hotkey_collector> read_hotkey_collector,
- std::shared_ptr<hotkey_collector> write_hotkey_collector)
+ std::shared_ptr<hotkey_collector> write_hotkey_collector,
+ std::shared_ptr<throttling_controller> read_size_throttling_controller)
: replica_base(r),
_read_hotkey_collector(read_hotkey_collector),
- _write_hotkey_collector(write_hotkey_collector)
+ _write_hotkey_collector(write_hotkey_collector),
+ _read_size_throttling_controller(read_size_throttling_controller)
{
dassert(_read_hotkey_collector != nullptr, "read hotkey collector is a
nullptr");
dassert(_write_hotkey_collector != nullptr, "write hotkey collector is a
nullptr");
+ dassert(_read_size_throttling_controller != nullptr,
+ "_read_size_throttling_controller is a nullptr");
_read_capacity_unit_size =
dsn_config_get_value_uint64("pegasus.server",
@@ -106,6 +111,7 @@ int64_t capacity_unit_calculator::add_read_cu(int64_t
read_data_size)
? (read_data_size + _read_capacity_unit_size - 1) >>
_log_read_cu_size
: 1;
_pfc_recent_read_cu->add(read_cu);
+ _read_size_throttling_controller->consume_token(read_data_size);
return read_cu;
}
diff --git a/src/server/capacity_unit_calculator.h
b/src/server/capacity_unit_calculator.h
index ab0f662..f7224a2 100644
--- a/src/server/capacity_unit_calculator.h
+++ b/src/server/capacity_unit_calculator.h
@@ -23,6 +23,13 @@
#include <dsn/perf_counter/perf_counter_wrapper.h>
#include <rrdb/rrdb_types.h>
+namespace dsn {
+namespace utils {
+class token_bucket_throttling_controller;
+} // namespace utils
+} // namespace dsn
+typedef dsn::utils::token_bucket_throttling_controller throttling_controller;
+
namespace pegasus {
namespace server {
@@ -31,9 +38,11 @@ class hotkey_collector;
class capacity_unit_calculator : public dsn::replication::replica_base
{
public:
- capacity_unit_calculator(replica_base *r,
- std::shared_ptr<hotkey_collector>
read_hotkey_collector,
- std::shared_ptr<hotkey_collector>
write_hotkey_collector);
+ capacity_unit_calculator(
+ replica_base *r,
+ std::shared_ptr<hotkey_collector> read_hotkey_collector,
+ std::shared_ptr<hotkey_collector> write_hotkey_collector,
+ std::shared_ptr<throttling_controller>
_read_size_throttling_controller);
virtual ~capacity_unit_calculator() = default;
@@ -119,6 +128,8 @@ private:
*/
std::shared_ptr<hotkey_collector> _read_hotkey_collector;
std::shared_ptr<hotkey_collector> _write_hotkey_collector;
+
+ std::shared_ptr<throttling_controller> _read_size_throttling_controller;
};
} // namespace server
diff --git a/src/server/pegasus_server_impl.cpp
b/src/server/pegasus_server_impl.cpp
index 34ef06e..e51195f 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -31,6 +31,7 @@
#include <dsn/dist/fmt_logging.h>
#include <dsn/dist/replication/replication.codes.h>
#include <dsn/utility/flags.h>
+#include <dsn/utils/token_bucket_throttling_controller.h>
#include "base/pegasus_key_schema.h"
#include "base/pegasus_value_schema.h"
@@ -274,6 +275,12 @@ void pegasus_server_impl::on_get(get_rpc rpc)
resp.partition_index = _gpid.get_partition_index();
resp.server = _primary_address;
+ if (!_read_size_throttling_controller->available()) {
+ rpc.error() = dsn::ERR_BUSY;
+ _counter_recent_read_throttling_reject_count->increment();
+ return;
+ }
+
rocksdb::Slice skey(key.data(), key.length());
std::string value;
rocksdb::Status status = _db->Get(_data_cf_rd_opts, _data_cf, skey,
&value);
@@ -353,6 +360,12 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
resp.partition_index = _gpid.get_partition_index();
resp.server = _primary_address;
+ if (!_read_size_throttling_controller->available()) {
+ rpc.error() = dsn::ERR_BUSY;
+ _counter_recent_read_throttling_reject_count->increment();
+ return;
+ }
+
if (!is_filter_type_supported(request.sort_key_filter_type)) {
derror("%s: invalid argument for multi_get from %s: "
"sort key filter type %d not supported",
@@ -775,6 +788,11 @@ void
pegasus_server_impl::on_sortkey_count(sortkey_count_rpc rpc)
resp.app_id = _gpid.get_app_id();
resp.partition_index = _gpid.get_partition_index();
resp.server = _primary_address;
+ if (!_read_size_throttling_controller->available()) {
+ rpc.error() = dsn::ERR_BUSY;
+ _counter_recent_read_throttling_reject_count->increment();
+ return;
+ }
// scan
::dsn::blob start_key, stop_key;
@@ -852,6 +870,12 @@ void pegasus_server_impl::on_ttl(ttl_rpc rpc)
resp.partition_index = _gpid.get_partition_index();
resp.server = _primary_address;
+ if (!_read_size_throttling_controller->available()) {
+ rpc.error() = dsn::ERR_BUSY;
+ _counter_recent_read_throttling_reject_count->increment();
+ return;
+ }
+
rocksdb::Slice skey(key.data(), key.length());
std::string value;
rocksdb::Status status = _db->Get(_data_cf_rd_opts, _data_cf, skey,
&value);
@@ -916,6 +940,12 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc
rpc)
resp.partition_index = _gpid.get_partition_index();
resp.server = _primary_address;
+ if (!_read_size_throttling_controller->available()) {
+ rpc.error() = dsn::ERR_BUSY;
+ _counter_recent_read_throttling_reject_count->increment();
+ return;
+ }
+
if (!is_filter_type_supported(request.hash_key_filter_type)) {
derror("%s: invalid argument for get_scanner from %s: "
"hash key filter type %d not supported",
@@ -1166,6 +1196,12 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
resp.partition_index = _gpid.get_partition_index();
resp.server = _primary_address;
+ if (!_read_size_throttling_controller->available()) {
+ rpc.error() = dsn::ERR_BUSY;
+ _counter_recent_read_throttling_reject_count->increment();
+ return;
+ }
+
std::unique_ptr<pegasus_scan_context> context =
_context_cache.fetch(request.context_id);
if (context) {
rocksdb::Iterator *it = context->iterator.get();
@@ -1562,7 +1598,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t
&args) { _context_cache
// initialize cu calculator and write service after server being
initialized.
_cu_calculator = dsn::make_unique<capacity_unit_calculator>(
- this, _read_hotkey_collector, _write_hotkey_collector);
+ this, _read_hotkey_collector, _write_hotkey_collector,
_read_size_throttling_controller);
_server_write = dsn::make_unique<pegasus_server_write>(this, _verbose_log);
::dsn::tasking::enqueue_timer(LPC_ANALYZE_HOTKEY,
@@ -2421,6 +2457,8 @@ void pegasus_server_impl::update_app_envs(const
std::map<std::string, std::strin
update_validate_partition_hash(envs);
update_user_specified_compaction(envs);
_manual_compact_svc.start_manual_compact_if_needed(envs);
+
+ update_throttling_controller(envs);
}
int64_t pegasus_server_impl::last_flushed_decree() const
@@ -2519,6 +2557,38 @@ void
pegasus_server_impl::update_checkpoint_reserve(const std::map<std::string,
}
}
+void pegasus_server_impl::update_throttling_controller(
+ const std::map<std::string, std::string> &envs)
+{
+ bool throttling_changed = false;
+ std::string old_throttling;
+ std::string parse_error;
+ auto find = envs.find(READ_SIZE_THROTTLING);
+ if (find != envs.end()) {
+ if (!_read_size_throttling_controller->parse_from_env(find->second,
+
get_app_info()->partition_count,
+ parse_error,
+
throttling_changed,
+ old_throttling))
{
+ dwarn_replica("parse env failed, key = \"{}\", value = \"{}\",
error = \"{}\"",
+ READ_SIZE_THROTTLING,
+ find->second,
+ parse_error);
+ // reset if parse failed
+ _read_size_throttling_controller->reset(throttling_changed,
old_throttling);
+ }
+ } else {
+ // reset if env not found
+ _read_size_throttling_controller->reset(throttling_changed,
old_throttling);
+ }
+ if (throttling_changed) {
+ ddebug_replica("switch {} from \"{}\" to \"{}\"",
+ READ_SIZE_THROTTLING,
+ old_throttling,
+ _read_size_throttling_controller->env_value());
+ }
+}
+
void pegasus_server_impl::update_slow_query_threshold(
const std::map<std::string, std::string> &envs)
{
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index f7a68cf..2c4e0f9 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -37,6 +37,13 @@
#include "range_read_limiter.h"
#include "pegasus_read_service.h"
+namespace dsn {
+namespace utils {
+class token_bucket_throttling_controller;
+} // namespace utils
+} // namespace dsn
+typedef dsn::utils::token_bucket_throttling_controller throttling_controller;
+
namespace pegasus {
namespace server {
@@ -275,6 +282,8 @@ private:
void update_user_specified_compaction(const std::map<std::string,
std::string> &envs);
+ void update_throttling_controller(const std::map<std::string, std::string>
&envs);
+
// return true if parse compression types 'config' success, otherwise
return false.
// 'compression_per_level' will not be changed if parse failed.
bool parse_compression_types(const std::string &config,
@@ -426,6 +435,8 @@ private:
std::shared_ptr<hotkey_collector> _read_hotkey_collector;
std::shared_ptr<hotkey_collector> _write_hotkey_collector;
+ std::shared_ptr<throttling_controller> _read_size_throttling_controller;
+
// perf counters
::dsn::perf_counter_wrapper _pfc_get_qps;
::dsn::perf_counter_wrapper _pfc_multi_get_qps;
@@ -464,6 +475,8 @@ private:
dsn::perf_counter_wrapper _pfc_rdb_l0_hit_count;
dsn::perf_counter_wrapper _pfc_rdb_l1_hit_count;
dsn::perf_counter_wrapper _pfc_rdb_l2andup_hit_count;
+
+ dsn::perf_counter_wrapper _counter_recent_read_throttling_reject_count;
};
} // namespace server
diff --git a/src/server/pegasus_server_impl_init.cpp
b/src/server/pegasus_server_impl_init.cpp
index 6b38e63..2aca7d5 100644
--- a/src/server/pegasus_server_impl_init.cpp
+++ b/src/server/pegasus_server_impl_init.cpp
@@ -22,6 +22,7 @@
#include <unordered_map>
#include <dsn/utility/flags.h>
#include <rocksdb/filter_policy.h>
+#include <dsn/utils/token_bucket_throttling_controller.h>
#include "capacity_unit_calculator.h"
#include "hashkey_transform.h"
@@ -109,6 +110,9 @@
pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
_write_hotkey_collector =
std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::WRITE, this);
+ _read_size_throttling_controller =
+ std::make_shared<dsn::utils::token_bucket_throttling_controller>();
+
_verbose_log = dsn_config_get_value_bool("pegasus.server",
"rocksdb_verbose_log",
false,
@@ -715,6 +719,10 @@
pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
"statistics the number of
times bloom FullFilter "
"has not avoided the reads
and data actually "
"exist");
+
+ auto counter_str = fmt::format("recent.read.throttling.reject.count@{}",
str_gpid.c_str());
+ _counter_recent_read_throttling_reject_count.init_app_counter(
+ "eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER,
counter_str.c_str());
}
} // namespace server
} // namespace pegasus
diff --git a/src/server/test/capacity_unit_calculator_test.cpp
b/src/server/test/capacity_unit_calculator_test.cpp
index ef8f39f..2d1c262 100644
--- a/src/server/test/capacity_unit_calculator_test.cpp
+++ b/src/server/test/capacity_unit_calculator_test.cpp
@@ -21,6 +21,7 @@
#include "server/capacity_unit_calculator.h"
#include <dsn/dist/replication/replica_base.h>
+#include <dsn/utils/token_bucket_throttling_controller.h>
#include "pegasus_key_schema.h"
#include "server/hotkey_collector.h"
@@ -53,7 +54,8 @@ public:
: capacity_unit_calculator(
r,
std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::READ, r),
-
std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::WRITE, r))
+
std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::WRITE, r),
+
std::make_shared<dsn::utils::token_bucket_throttling_controller>())
{
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]