This is an automated email from the ASF dual-hosted git repository.

smityz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 791e0fc  feat: support read throttling by size (#829)
791e0fc is described below

commit 791e0fcf58d122dca075c7818aed8d15f296caeb
Author: Smilencer <[email protected]>
AuthorDate: Wed Nov 3 17:01:40 2021 +0800

    feat: support read throttling by size (#829)
---
 rdsn                                              |  2 +-
 src/base/pegasus_const.cpp                        |  2 +
 src/base/pegasus_const.h                          |  2 +
 src/server/capacity_unit_calculator.cpp           | 10 +++-
 src/server/capacity_unit_calculator.h             | 17 +++++-
 src/server/pegasus_server_impl.cpp                | 72 ++++++++++++++++++++++-
 src/server/pegasus_server_impl.h                  | 13 ++++
 src/server/pegasus_server_impl_init.cpp           |  8 +++
 src/server/test/capacity_unit_calculator_test.cpp |  4 +-
 9 files changed, 122 insertions(+), 8 deletions(-)

diff --git a/rdsn b/rdsn
index 3bcff61..fc41809 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit 3bcff61f1103d2947c2fc9b723f81b720f342c27
+Subproject commit fc41809ce1622a47a535a2316df91d4d626f35ed
diff --git a/src/base/pegasus_const.cpp b/src/base/pegasus_const.cpp
index 1a72bc6..f8127ba 100644
--- a/src/base/pegasus_const.cpp
+++ b/src/base/pegasus_const.cpp
@@ -97,4 +97,6 @@ const std::string 
SPLIT_VALIDATE_PARTITION_HASH("replica.split.validate_partitio
 
 /// json string which represents user specified compaction
 const std::string USER_SPECIFIED_COMPACTION("user_specified_compaction");
+
+const std::string READ_SIZE_THROTTLING("replica.read_throttling_by_size");
 } // namespace pegasus
diff --git a/src/base/pegasus_const.h b/src/base/pegasus_const.h
index c0eb973..bbe4193 100644
--- a/src/base/pegasus_const.h
+++ b/src/base/pegasus_const.h
@@ -68,4 +68,6 @@ extern const std::string ROCKSDB_BLOCK_CACHE_ENABLED;
 extern const std::string SPLIT_VALIDATE_PARTITION_HASH;
 
 extern const std::string USER_SPECIFIED_COMPACTION;
+
+extern const std::string READ_SIZE_THROTTLING;
 } // namespace pegasus
diff --git a/src/server/capacity_unit_calculator.cpp 
b/src/server/capacity_unit_calculator.cpp
index 22d5ff0..5f978cd 100644
--- a/src/server/capacity_unit_calculator.cpp
+++ b/src/server/capacity_unit_calculator.cpp
@@ -20,6 +20,7 @@
 #include "capacity_unit_calculator.h"
 
 #include <dsn/utility/config_api.h>
+#include <dsn/utils/token_bucket_throttling_controller.h>
 #include <rocksdb/status.h>
 #include "hotkey_collector.h"
 
@@ -29,13 +30,17 @@ namespace server {
 capacity_unit_calculator::capacity_unit_calculator(
     replica_base *r,
     std::shared_ptr<hotkey_collector> read_hotkey_collector,
-    std::shared_ptr<hotkey_collector> write_hotkey_collector)
+    std::shared_ptr<hotkey_collector> write_hotkey_collector,
+    std::shared_ptr<throttling_controller> read_size_throttling_controller)
     : replica_base(r),
       _read_hotkey_collector(read_hotkey_collector),
-      _write_hotkey_collector(write_hotkey_collector)
+      _write_hotkey_collector(write_hotkey_collector),
+      _read_size_throttling_controller(read_size_throttling_controller)
 {
     dassert(_read_hotkey_collector != nullptr, "read hotkey collector is a 
nullptr");
     dassert(_write_hotkey_collector != nullptr, "write hotkey collector is a 
nullptr");
+    dassert(_read_size_throttling_controller != nullptr,
+            "_read_size_throttling_controller is a nullptr");
 
     _read_capacity_unit_size =
         dsn_config_get_value_uint64("pegasus.server",
@@ -106,6 +111,7 @@ int64_t capacity_unit_calculator::add_read_cu(int64_t 
read_data_size)
                           ? (read_data_size + _read_capacity_unit_size - 1) >> 
_log_read_cu_size
                           : 1;
     _pfc_recent_read_cu->add(read_cu);
+    _read_size_throttling_controller->consume_token(read_data_size);
     return read_cu;
 }
 
diff --git a/src/server/capacity_unit_calculator.h 
b/src/server/capacity_unit_calculator.h
index ab0f662..f7224a2 100644
--- a/src/server/capacity_unit_calculator.h
+++ b/src/server/capacity_unit_calculator.h
@@ -23,6 +23,13 @@
 #include <dsn/perf_counter/perf_counter_wrapper.h>
 #include <rrdb/rrdb_types.h>
 
+namespace dsn {
+namespace utils {
+class token_bucket_throttling_controller;
+} // namespace utils
+} // namespace dsn
+typedef dsn::utils::token_bucket_throttling_controller throttling_controller;
+
 namespace pegasus {
 namespace server {
 
@@ -31,9 +38,11 @@ class hotkey_collector;
 class capacity_unit_calculator : public dsn::replication::replica_base
 {
 public:
-    capacity_unit_calculator(replica_base *r,
-                             std::shared_ptr<hotkey_collector> 
read_hotkey_collector,
-                             std::shared_ptr<hotkey_collector> 
write_hotkey_collector);
+    capacity_unit_calculator(
+        replica_base *r,
+        std::shared_ptr<hotkey_collector> read_hotkey_collector,
+        std::shared_ptr<hotkey_collector> write_hotkey_collector,
+        std::shared_ptr<throttling_controller> 
_read_size_throttling_controller);
 
     virtual ~capacity_unit_calculator() = default;
 
@@ -119,6 +128,8 @@ private:
     */
     std::shared_ptr<hotkey_collector> _read_hotkey_collector;
     std::shared_ptr<hotkey_collector> _write_hotkey_collector;
+
+    std::shared_ptr<throttling_controller> _read_size_throttling_controller;
 };
 
 } // namespace server
diff --git a/src/server/pegasus_server_impl.cpp 
b/src/server/pegasus_server_impl.cpp
index 34ef06e..e51195f 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -31,6 +31,7 @@
 #include <dsn/dist/fmt_logging.h>
 #include <dsn/dist/replication/replication.codes.h>
 #include <dsn/utility/flags.h>
+#include <dsn/utils/token_bucket_throttling_controller.h>
 
 #include "base/pegasus_key_schema.h"
 #include "base/pegasus_value_schema.h"
@@ -274,6 +275,12 @@ void pegasus_server_impl::on_get(get_rpc rpc)
     resp.partition_index = _gpid.get_partition_index();
     resp.server = _primary_address;
 
+    if (!_read_size_throttling_controller->available()) {
+        rpc.error() = dsn::ERR_BUSY;
+        _counter_recent_read_throttling_reject_count->increment();
+        return;
+    }
+
     rocksdb::Slice skey(key.data(), key.length());
     std::string value;
     rocksdb::Status status = _db->Get(_data_cf_rd_opts, _data_cf, skey, 
&value);
@@ -353,6 +360,12 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
     resp.partition_index = _gpid.get_partition_index();
     resp.server = _primary_address;
 
+    if (!_read_size_throttling_controller->available()) {
+        rpc.error() = dsn::ERR_BUSY;
+        _counter_recent_read_throttling_reject_count->increment();
+        return;
+    }
+
     if (!is_filter_type_supported(request.sort_key_filter_type)) {
         derror("%s: invalid argument for multi_get from %s: "
                "sort key filter type %d not supported",
@@ -775,6 +788,11 @@ void 
pegasus_server_impl::on_sortkey_count(sortkey_count_rpc rpc)
     resp.app_id = _gpid.get_app_id();
     resp.partition_index = _gpid.get_partition_index();
     resp.server = _primary_address;
+    if (!_read_size_throttling_controller->available()) {
+        rpc.error() = dsn::ERR_BUSY;
+        _counter_recent_read_throttling_reject_count->increment();
+        return;
+    }
 
     // scan
     ::dsn::blob start_key, stop_key;
@@ -852,6 +870,12 @@ void pegasus_server_impl::on_ttl(ttl_rpc rpc)
     resp.partition_index = _gpid.get_partition_index();
     resp.server = _primary_address;
 
+    if (!_read_size_throttling_controller->available()) {
+        rpc.error() = dsn::ERR_BUSY;
+        _counter_recent_read_throttling_reject_count->increment();
+        return;
+    }
+
     rocksdb::Slice skey(key.data(), key.length());
     std::string value;
     rocksdb::Status status = _db->Get(_data_cf_rd_opts, _data_cf, skey, 
&value);
@@ -916,6 +940,12 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc 
rpc)
     resp.partition_index = _gpid.get_partition_index();
     resp.server = _primary_address;
 
+    if (!_read_size_throttling_controller->available()) {
+        rpc.error() = dsn::ERR_BUSY;
+        _counter_recent_read_throttling_reject_count->increment();
+        return;
+    }
+
     if (!is_filter_type_supported(request.hash_key_filter_type)) {
         derror("%s: invalid argument for get_scanner from %s: "
                "hash key filter type %d not supported",
@@ -1166,6 +1196,12 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
     resp.partition_index = _gpid.get_partition_index();
     resp.server = _primary_address;
 
+    if (!_read_size_throttling_controller->available()) {
+        rpc.error() = dsn::ERR_BUSY;
+        _counter_recent_read_throttling_reject_count->increment();
+        return;
+    }
+
     std::unique_ptr<pegasus_scan_context> context = 
_context_cache.fetch(request.context_id);
     if (context) {
         rocksdb::Iterator *it = context->iterator.get();
@@ -1562,7 +1598,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t 
&args) { _context_cache
 
     // initialize cu calculator and write service after server being 
initialized.
     _cu_calculator = dsn::make_unique<capacity_unit_calculator>(
-        this, _read_hotkey_collector, _write_hotkey_collector);
+        this, _read_hotkey_collector, _write_hotkey_collector, 
_read_size_throttling_controller);
     _server_write = dsn::make_unique<pegasus_server_write>(this, _verbose_log);
 
     ::dsn::tasking::enqueue_timer(LPC_ANALYZE_HOTKEY,
@@ -2421,6 +2457,8 @@ void pegasus_server_impl::update_app_envs(const 
std::map<std::string, std::strin
     update_validate_partition_hash(envs);
     update_user_specified_compaction(envs);
     _manual_compact_svc.start_manual_compact_if_needed(envs);
+
+    update_throttling_controller(envs);
 }
 
 int64_t pegasus_server_impl::last_flushed_decree() const
@@ -2519,6 +2557,38 @@ void 
pegasus_server_impl::update_checkpoint_reserve(const std::map<std::string,
     }
 }
 
+void pegasus_server_impl::update_throttling_controller(
+    const std::map<std::string, std::string> &envs)
+{
+    bool throttling_changed = false;
+    std::string old_throttling;
+    std::string parse_error;
+    auto find = envs.find(READ_SIZE_THROTTLING);
+    if (find != envs.end()) {
+        if (!_read_size_throttling_controller->parse_from_env(find->second,
+                                                              
get_app_info()->partition_count,
+                                                              parse_error,
+                                                              
throttling_changed,
+                                                              old_throttling)) 
{
+            dwarn_replica("parse env failed, key = \"{}\", value = \"{}\", 
error = \"{}\"",
+                          READ_SIZE_THROTTLING,
+                          find->second,
+                          parse_error);
+            // reset if parse failed
+            _read_size_throttling_controller->reset(throttling_changed, 
old_throttling);
+        }
+    } else {
+        // reset if env not found
+        _read_size_throttling_controller->reset(throttling_changed, 
old_throttling);
+    }
+    if (throttling_changed) {
+        ddebug_replica("switch {} from \"{}\" to \"{}\"",
+                       READ_SIZE_THROTTLING,
+                       old_throttling,
+                       _read_size_throttling_controller->env_value());
+    }
+}
+
 void pegasus_server_impl::update_slow_query_threshold(
     const std::map<std::string, std::string> &envs)
 {
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index f7a68cf..2c4e0f9 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -37,6 +37,13 @@
 #include "range_read_limiter.h"
 #include "pegasus_read_service.h"
 
+namespace dsn {
+namespace utils {
+class token_bucket_throttling_controller;
+} // namespace utils
+} // namespace dsn
+typedef dsn::utils::token_bucket_throttling_controller throttling_controller;
+
 namespace pegasus {
 namespace server {
 
@@ -275,6 +282,8 @@ private:
 
     void update_user_specified_compaction(const std::map<std::string, 
std::string> &envs);
 
+    void update_throttling_controller(const std::map<std::string, std::string> 
&envs);
+
     // return true if parse compression types 'config' success, otherwise 
return false.
     // 'compression_per_level' will not be changed if parse failed.
     bool parse_compression_types(const std::string &config,
@@ -426,6 +435,8 @@ private:
     std::shared_ptr<hotkey_collector> _read_hotkey_collector;
     std::shared_ptr<hotkey_collector> _write_hotkey_collector;
 
+    std::shared_ptr<throttling_controller> _read_size_throttling_controller;
+
     // perf counters
     ::dsn::perf_counter_wrapper _pfc_get_qps;
     ::dsn::perf_counter_wrapper _pfc_multi_get_qps;
@@ -464,6 +475,8 @@ private:
     dsn::perf_counter_wrapper _pfc_rdb_l0_hit_count;
     dsn::perf_counter_wrapper _pfc_rdb_l1_hit_count;
     dsn::perf_counter_wrapper _pfc_rdb_l2andup_hit_count;
+
+    dsn::perf_counter_wrapper _counter_recent_read_throttling_reject_count;
 };
 
 } // namespace server
diff --git a/src/server/pegasus_server_impl_init.cpp 
b/src/server/pegasus_server_impl_init.cpp
index 6b38e63..2aca7d5 100644
--- a/src/server/pegasus_server_impl_init.cpp
+++ b/src/server/pegasus_server_impl_init.cpp
@@ -22,6 +22,7 @@
 #include <unordered_map>
 #include <dsn/utility/flags.h>
 #include <rocksdb/filter_policy.h>
+#include <dsn/utils/token_bucket_throttling_controller.h>
 
 #include "capacity_unit_calculator.h"
 #include "hashkey_transform.h"
@@ -109,6 +110,9 @@ 
pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
     _write_hotkey_collector =
         
std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::WRITE, this);
 
+    _read_size_throttling_controller =
+        std::make_shared<dsn::utils::token_bucket_throttling_controller>();
+
     _verbose_log = dsn_config_get_value_bool("pegasus.server",
                                              "rocksdb_verbose_log",
                                              false,
@@ -715,6 +719,10 @@ 
pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
                                                  "statistics the number of 
times bloom FullFilter "
                                                  "has not avoided the reads 
and data actually "
                                                  "exist");
+
+    auto counter_str = fmt::format("recent.read.throttling.reject.count@{}", 
str_gpid.c_str());
+    _counter_recent_read_throttling_reject_count.init_app_counter(
+        "eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, 
counter_str.c_str());
 }
 } // namespace server
 } // namespace pegasus
diff --git a/src/server/test/capacity_unit_calculator_test.cpp 
b/src/server/test/capacity_unit_calculator_test.cpp
index ef8f39f..2d1c262 100644
--- a/src/server/test/capacity_unit_calculator_test.cpp
+++ b/src/server/test/capacity_unit_calculator_test.cpp
@@ -21,6 +21,7 @@
 #include "server/capacity_unit_calculator.h"
 
 #include <dsn/dist/replication/replica_base.h>
+#include <dsn/utils/token_bucket_throttling_controller.h>
 #include "pegasus_key_schema.h"
 #include "server/hotkey_collector.h"
 
@@ -53,7 +54,8 @@ public:
         : capacity_unit_calculator(
               r,
               
std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::READ, r),
-              
std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::WRITE, r))
+              
std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::WRITE, r),
+              
std::make_shared<dsn::utils::token_bucket_throttling_controller>())
     {
     }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to