This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new f2d850b feat(hotkey): capture data part2 - declare coarse collector
(#624)
f2d850b is described below
commit f2d850bfb242ea06d87b0ab0801f1504175f4d8c
Author: Smilencer <[email protected]>
AuthorDate: Tue Oct 27 14:34:44 2020 +0800
feat(hotkey): capture data part2 - declare coarse collector (#624)
---
src/server/hotkey_collector.cpp | 108 ++++++++++++++++++++++++++++++++++++++--
src/server/hotkey_collector.h | 28 +++++++++--
2 files changed, 129 insertions(+), 7 deletions(-)
diff --git a/src/server/hotkey_collector.cpp b/src/server/hotkey_collector.cpp
index 16984ee..2d81595 100644
--- a/src/server/hotkey_collector.cpp
+++ b/src/server/hotkey_collector.cpp
@@ -19,6 +19,8 @@
#include <dsn/dist/replication/replication_enums.h>
#include <dsn/utility/smart_pointers.h>
+#include <dsn/utility/flags.h>
+#include <boost/functional/hash.hpp>
#include "base/pegasus_key_schema.h"
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/flags.h>
@@ -26,18 +28,88 @@
namespace pegasus {
namespace server {
+DSN_DEFINE_int32("pegasus.server",
+ coarse_data_variance_threshold,
+ 3,
+ "the threshold of variance calculate to find the outliers");
+
+DSN_DEFINE_validator(coarse_data_variance_threshold,
+ [](int32_t threshold) -> bool { return (threshold >= 0);
});
+
+// TODO: (Tangyanzhao) add a limit to avoid changing when detecting
+DSN_DEFINE_int32("pegasus.server",
+ data_capture_hash_bucket_num,
+ 37,
+ "the number of data capture hash buckets");
+
+DSN_DEFINE_validator(data_capture_hash_bucket_num, [](int32_t bucket_num) ->
bool {
+ if (bucket_num < 3) {
+ return false;
+ }
+ // data_capture_hash_bucket_num should be a prime number
+ for (int i = 2; i <= bucket_num / i; i++) {
+ if (bucket_num % i == 0) {
+ return false;
+ }
+ }
+ return true;
+});
+
DSN_DEFINE_int32(
"pegasus.server",
max_seconds_to_detect_hotkey,
150,
"the max time (in seconds) allowed to capture hotkey, will stop if
hotkey's not found");
+// 68–95–99.7 rule, same algorithm as
hotspot_partition_calculator::stat_histories_analyse
+static bool
+find_outlier_index(const std::vector<uint64_t> &captured_keys, int threshold,
int &hot_index)
+{
+ dcheck_gt(captured_keys.size(), 2);
+ int data_size = captured_keys.size();
+ // empirical rule to calculate hot point of each partition
+ // same algorithm as hotspot_partition_calculator::stat_histories_analyse
+ double table_captured_key_sum = 0;
+ int hot_value = 0;
+ for (int i = 0; i < data_size; i++) {
+ table_captured_key_sum += captured_keys[i];
+ if (captured_keys[i] > hot_value) {
+ hot_index = i;
+ hot_value = captured_keys[i];
+ }
+ }
+ // TODO: (Tangyanzhao) increase a judgment of table_captured_key_sum
+ double captured_keys_avg_count =
+ (table_captured_key_sum - captured_keys[hot_index]) / (data_size - 1);
+ double standard_deviation = 0;
+ for (int i = 0; i < data_size; i++) {
+ if (i != hot_index) {
+ standard_deviation += pow((captured_keys[i] -
captured_keys_avg_count), 2);
+ }
+ }
+ standard_deviation = sqrt(standard_deviation / (data_size - 2));
+ double hot_point = (hot_value - captured_keys_avg_count) /
standard_deviation;
+ if (hot_point >= threshold) {
+ return true;
+ } else {
+ hot_index = -1;
+ return false;
+ }
+}
+
+// TODO: (Tangyanzhao) replace it to xxhash
+static int get_bucket_id(dsn::string_view data)
+{
+ size_t hash_value = boost::hash_range(data.begin(), data.end());
+ return static_cast<int>(hash_value % FLAGS_data_capture_hash_bucket_num);
+}
+
hotkey_collector::hotkey_collector(dsn::replication::hotkey_type::type
hotkey_type,
dsn::replication::replica_base *r_base)
: replica_base(r_base),
_state(hotkey_collector_state::STOPPED),
_hotkey_type(hotkey_type),
- _internal_collector(std::make_shared<hotkey_empty_data_collector>()),
+ _internal_collector(std::make_shared<hotkey_empty_data_collector>(this)),
_collector_start_time_second(0)
{
}
@@ -78,7 +150,11 @@ void hotkey_collector::analyse_data()
switch (_state.load()) {
case hotkey_collector_state::COARSE_DETECTING:
if (!terminate_if_timeout()) {
- _internal_collector->analyse_data();
+ _internal_collector->analyse_data(_result);
+ if (_result.coarse_bucket_index != -1) {
+ // TODO: (Tangyanzhao) reset _internal_collector to
hotkey_fine_data_collector
+ _state.store(hotkey_collector_state::FINE_DETECTING);
+ }
}
return;
default:
@@ -108,7 +184,7 @@ void
hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response
return;
case hotkey_collector_state::STOPPED:
_collector_start_time_second = dsn_now_s();
- // TODO: (Tangyanzhao) start coarse detecting
+ _internal_collector.reset(new hotkey_coarse_data_collector(this));
_state.store(hotkey_collector_state::COARSE_DETECTING);
resp.err = dsn::ERR_OK;
hint = fmt::format("starting to detect {} hotkey",
dsn::enum_to_string(_hotkey_type));
@@ -149,5 +225,31 @@ bool hotkey_collector::terminate_if_timeout()
return false;
}
+hotkey_coarse_data_collector::hotkey_coarse_data_collector(replica_base *base)
+ : internal_collector_base(base),
_hash_buckets(FLAGS_data_capture_hash_bucket_num)
+{
+ for (auto &bucket : _hash_buckets) {
+ bucket.store(0);
+ }
+}
+
+void hotkey_coarse_data_collector::capture_data(const dsn::blob &hash_key,
uint64_t weight)
+{
+ _hash_buckets[get_bucket_id(hash_key)].fetch_add(weight);
+}
+
+void hotkey_coarse_data_collector::analyse_data(detect_hotkey_result &result)
+{
+ std::vector<uint64_t> buckets(FLAGS_data_capture_hash_bucket_num);
+ for (int i = 0; i < buckets.size(); i++) {
+ buckets[i] = _hash_buckets[i].load();
+ _hash_buckets[i].store(0);
+ }
+ if (!find_outlier_index(
+ buckets, FLAGS_coarse_data_variance_threshold,
result.coarse_bucket_index)) {
+ result.coarse_bucket_index = -1;
+ }
+}
+
} // namespace server
} // namespace pegasus
diff --git a/src/server/hotkey_collector.h b/src/server/hotkey_collector.h
index 6cf34cc..d4b43a4 100644
--- a/src/server/hotkey_collector.h
+++ b/src/server/hotkey_collector.h
@@ -26,6 +26,11 @@ namespace server {
class internal_collector_base;
+struct detect_hotkey_result
+{
+ int coarse_bucket_index = -1;
+};
+
// hotkey_collector is responsible to find the hot keys after the partition
// was detected to be hot. The two types of hotkey, READ & WRITE, are
detected
// separately.
@@ -82,25 +87,40 @@ private:
void terminate();
bool terminate_if_timeout();
+ detect_hotkey_result _result;
std::atomic<hotkey_collector_state> _state;
const dsn::replication::hotkey_type::type _hotkey_type;
std::shared_ptr<internal_collector_base> _internal_collector;
uint64_t _collector_start_time_second;
};
-class internal_collector_base
+class internal_collector_base : public dsn::replication::replica_base
{
public:
+ explicit internal_collector_base(replica_base *base) :
replica_base(base){};
virtual void capture_data(const dsn::blob &hash_key, uint64_t weight) = 0;
- virtual void analyse_data() = 0;
+ virtual void analyse_data(detect_hotkey_result &result) = 0;
};
// used in hotkey_collector_state::STOPPED and
hotkey_collector_state::FINISHED, avoid null pointers
class hotkey_empty_data_collector : public internal_collector_base
{
public:
- void capture_data(const dsn::blob &hash_key, uint64_t size) {}
- void analyse_data() {}
+ explicit hotkey_empty_data_collector(replica_base *base) :
internal_collector_base(base) {}
+ void capture_data(const dsn::blob &hash_key, uint64_t weight) override {}
+ void analyse_data(detect_hotkey_result &result) override {}
+};
+
+// TODO: (Tangyanzhao) add a unit test of hotkey_coarse_data_collector
+class hotkey_coarse_data_collector : public internal_collector_base
+{
+public:
+ explicit hotkey_coarse_data_collector(replica_base *base);
+ void capture_data(const dsn::blob &hash_key, uint64_t weight) override;
+ void analyse_data(detect_hotkey_result &result) override;
+
+private:
+ std::vector<std::atomic<uint64_t>> _hash_buckets;
};
} // namespace server
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]