This is an automated email from the ASF dual-hosted git repository.
wutao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new aeb2a54 feat(hotkey): capture data part3 - declare fine collector
(#631)
aeb2a54 is described below
commit aeb2a54fc94410a38f6dfb1abafb79d302a51553
Author: Smilencer <[email protected]>
AuthorDate: Fri Nov 6 14:15:47 2020 +0800
feat(hotkey): capture data part3 - declare fine collector (#631)
---
rdsn | 2 +-
src/server/hotkey_collector.cpp | 115 ++++++++++++++++++++++++++++++++++------
src/server/hotkey_collector.h | 22 +++++++-
3 files changed, 120 insertions(+), 19 deletions(-)
diff --git a/rdsn b/rdsn
index 1812022..ec47b0c 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit 18120226e6123fbf2ed744cc580e914be6171f85
+Subproject commit ec47b0cfd8cbdc30b69b1d252e59588afb3f0b51
diff --git a/src/server/hotkey_collector.cpp b/src/server/hotkey_collector.cpp
index 2d81595..053d72c 100644
--- a/src/server/hotkey_collector.cpp
+++ b/src/server/hotkey_collector.cpp
@@ -21,32 +21,36 @@
#include <dsn/utility/smart_pointers.h>
#include <dsn/utility/flags.h>
#include <boost/functional/hash.hpp>
-#include "base/pegasus_key_schema.h"
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/flags.h>
+#include "base/pegasus_key_schema.h"
namespace pegasus {
namespace server {
-DSN_DEFINE_int32("pegasus.server",
- coarse_data_variance_threshold,
- 3,
- "the threshold of variance calculate to find the outliers");
+DSN_DEFINE_uint32(
+ "pegasus.server",
+ hot_bucket_variance_threshold,
+ 3,
+ "the variance threshold to detect hot bucket during coarse analysis of
hotkey detection");
-DSN_DEFINE_validator(coarse_data_variance_threshold,
- [](int32_t threshold) -> bool { return (threshold >= 0);
});
+DSN_DEFINE_uint32(
+ "pegasus.server",
+ hot_key_variance_threshold,
+ 3,
+ "the variance threshold to detect hot key during fine analysis of hotkey
detection");
// TODO: (Tangyanzhao) add a limit to avoid changing when detecting
-DSN_DEFINE_int32("pegasus.server",
- data_capture_hash_bucket_num,
- 37,
- "the number of data capture hash buckets");
+DSN_DEFINE_uint32("pegasus.server",
+ hotkey_buckets_num,
+ 37,
+ "the number of data capture hash buckets");
-DSN_DEFINE_validator(data_capture_hash_bucket_num, [](int32_t bucket_num) ->
bool {
+DSN_DEFINE_validator(hotkey_buckets_num, [](int32_t bucket_num) -> bool {
if (bucket_num < 3) {
return false;
}
- // data_capture_hash_bucket_num should be a prime number
+ // hotkey_buckets_num should be a prime number
for (int i = 2; i <= bucket_num / i; i++) {
if (bucket_num % i == 0) {
return false;
@@ -101,7 +105,7 @@ find_outlier_index(const std::vector<uint64_t>
&captured_keys, int threshold, in
static int get_bucket_id(dsn::string_view data)
{
size_t hash_value = boost::hash_range(data.begin(), data.end());
- return static_cast<int>(hash_value % FLAGS_data_capture_hash_bucket_num);
+ return static_cast<int>(hash_value % FLAGS_hotkey_buckets_num);
}
hotkey_collector::hotkey_collector(dsn::replication::hotkey_type::type
hotkey_type,
@@ -226,7 +230,7 @@ bool hotkey_collector::terminate_if_timeout()
}
hotkey_coarse_data_collector::hotkey_coarse_data_collector(replica_base *base)
- : internal_collector_base(base),
_hash_buckets(FLAGS_data_capture_hash_bucket_num)
+ : internal_collector_base(base), _hash_buckets(FLAGS_hotkey_buckets_num)
{
for (auto &bucket : _hash_buckets) {
bucket.store(0);
@@ -240,16 +244,93 @@ void hotkey_coarse_data_collector::capture_data(const
dsn::blob &hash_key, uint6
void hotkey_coarse_data_collector::analyse_data(detect_hotkey_result &result)
{
- std::vector<uint64_t> buckets(FLAGS_data_capture_hash_bucket_num);
+ std::vector<uint64_t> buckets(FLAGS_hotkey_buckets_num);
for (int i = 0; i < buckets.size(); i++) {
buckets[i] = _hash_buckets[i].load();
_hash_buckets[i].store(0);
}
if (!find_outlier_index(
- buckets, FLAGS_coarse_data_variance_threshold,
result.coarse_bucket_index)) {
+ buckets, FLAGS_hot_bucket_variance_threshold,
result.coarse_bucket_index)) {
result.coarse_bucket_index = -1;
}
}
+hotkey_fine_data_collector::hotkey_fine_data_collector(replica_base *base,
+ int target_bucket_index,
+ int max_queue_size)
+ : internal_collector_base(base),
+ _max_queue_size(max_queue_size),
+ _target_bucket_index(target_bucket_index),
+ _capture_key_queue(max_queue_size)
+{
+}
+
+void hotkey_fine_data_collector::capture_data(const dsn::blob &hash_key,
uint64_t weight)
+{
+ if (get_bucket_id(hash_key) != _target_bucket_index) {
+ return;
+ }
+ // abandon the key if enqueue failed (possibly because not enough room to
enqueue)
+ _capture_key_queue.try_enqueue(std::make_pair(hash_key, weight));
+}
+
+struct blob_hash
+{
+ std::size_t operator()(const dsn::blob &str) const
+ {
+ dsn::string_view cp(str);
+ return boost::hash_range(cp.begin(), cp.end());
+ }
+};
+
+struct blob_equal
+{
+ std::size_t operator()(const dsn::blob &lhs, const dsn::blob &rhs) const
+ {
+ return dsn::string_view(lhs) == dsn::string_view(rhs);
+ }
+};
+
+void hotkey_fine_data_collector::analyse_data(detect_hotkey_result &result)
+{
+ // hashkey -> weight
+ std::unordered_map<dsn::blob, uint64_t, blob_hash, blob_equal>
hash_keys_weight;
+ std::pair<dsn::blob, uint64_t> key_weight_pair;
+ // prevent endless loop, limit the number of elements analyzed not to
exceed the queue size
+ uint32_t dequeue_cnt = 0;
+ while (++dequeue_cnt <= _max_queue_size &&
_capture_key_queue.try_dequeue(key_weight_pair)) {
+ hash_keys_weight[key_weight_pair.first] += key_weight_pair.second;
+ }
+
+ if (hash_keys_weight.empty()) {
+ return;
+ }
+
+ // the weight of all the collected hash keys
+ std::vector<uint64_t> weights;
+ weights.reserve(hash_keys_weight.size());
+ dsn::string_view weight_max_key; // the hashkey with the max weight
+ uint64_t weight_max = 0; // the max weight by far
+ for (const auto &iter : hash_keys_weight) {
+ weights.push_back(iter.second);
+ if (iter.second > weight_max) {
+ weight_max = iter.second;
+ weight_max_key = iter.first;
+ }
+ }
+
+ // hash_key_counts stores the number of occurrences of each string
captured in a period of time
+ // The size of weights influences our hotkey determination strategy
+ // weights.size() <= 2: the hotkey must exist (the most weighted key),
because
+ // the two-level filtering significantly reduces the
+ // possibility that the hottest key is not the actual
hotkey.
+ // weights.size() >= 3: use find_outlier_index to determine whether a
hotkey exists
+ int hot_index;
+ if (weights.size() < 3 ||
+ find_outlier_index(weights, FLAGS_hot_key_variance_threshold,
hot_index)) {
+ result.hot_hash_key = std::string(weight_max_key);
+ }
+}
+
} // namespace server
} // namespace pegasus
diff --git a/src/server/hotkey_collector.h b/src/server/hotkey_collector.h
index d4b43a4..937d07d 100644
--- a/src/server/hotkey_collector.h
+++ b/src/server/hotkey_collector.h
@@ -18,8 +18,9 @@
#pragma once
#include <dsn/dist/replication/replication_types.h>
-#include "hotkey_collector_state.h"
+#include <concurrentqueue/concurrentqueue.h>
#include <dsn/dist/replication/replica_base.h>
+#include "hotkey_collector_state.h"
namespace pegasus {
namespace server {
@@ -29,6 +30,7 @@ class internal_collector_base;
struct detect_hotkey_result
{
int coarse_bucket_index = -1;
+ std::string hot_hash_key;
};
// hotkey_collector is responsible to find the hot keys after the partition
@@ -120,8 +122,26 @@ public:
void analyse_data(detect_hotkey_result &result) override;
private:
+ hotkey_coarse_data_collector() = delete;
+
std::vector<std::atomic<uint64_t>> _hash_buckets;
};
+class hotkey_fine_data_collector : public internal_collector_base
+{
+public:
+ hotkey_fine_data_collector(replica_base *base, int target_bucket_index,
int max_queue_size);
+ void capture_data(const dsn::blob &hash_key, uint64_t weight) override;
+ void analyse_data(detect_hotkey_result &result) override;
+
+private:
+ hotkey_fine_data_collector() = delete;
+
+ const uint32_t _max_queue_size;
+ const uint32_t _target_bucket_index;
+ // ConcurrentQueue is a lock-free queue to capture keys
+ moodycamel::ConcurrentQueue<std::pair<dsn::blob, uint64_t>>
_capture_key_queue;
+};
+
} // namespace server
} // namespace pegasus
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]