This is an automated email from the ASF dual-hosted git repository.

wutao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new aeb2a54  feat(hotkey): capture data part3 - declare fine collector  
(#631)
aeb2a54 is described below

commit aeb2a54fc94410a38f6dfb1abafb79d302a51553
Author: Smilencer <[email protected]>
AuthorDate: Fri Nov 6 14:15:47 2020 +0800

    feat(hotkey): capture data part3 - declare fine collector  (#631)
---
 rdsn                            |   2 +-
 src/server/hotkey_collector.cpp | 115 ++++++++++++++++++++++++++++++++++------
 src/server/hotkey_collector.h   |  22 +++++++-
 3 files changed, 120 insertions(+), 19 deletions(-)

diff --git a/rdsn b/rdsn
index 1812022..ec47b0c 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit 18120226e6123fbf2ed744cc580e914be6171f85
+Subproject commit ec47b0cfd8cbdc30b69b1d252e59588afb3f0b51
diff --git a/src/server/hotkey_collector.cpp b/src/server/hotkey_collector.cpp
index 2d81595..053d72c 100644
--- a/src/server/hotkey_collector.cpp
+++ b/src/server/hotkey_collector.cpp
@@ -21,32 +21,36 @@
 #include <dsn/utility/smart_pointers.h>
 #include <dsn/utility/flags.h>
 #include <boost/functional/hash.hpp>
-#include "base/pegasus_key_schema.h"
 #include <dsn/dist/fmt_logging.h>
 #include <dsn/utility/flags.h>
+#include "base/pegasus_key_schema.h"
 
 namespace pegasus {
 namespace server {
 
-DSN_DEFINE_int32("pegasus.server",
-                 coarse_data_variance_threshold,
-                 3,
-                 "the threshold of variance calculate to find the outliers");
+DSN_DEFINE_uint32(
+    "pegasus.server",
+    hot_bucket_variance_threshold,
+    3,
+    "the variance threshold to detect hot bucket during coarse analysis of 
hotkey detection");
 
-DSN_DEFINE_validator(coarse_data_variance_threshold,
-                     [](int32_t threshold) -> bool { return (threshold >= 0); 
});
+DSN_DEFINE_uint32(
+    "pegasus.server",
+    hot_key_variance_threshold,
+    3,
+    "the variance threshold to detect hot key during fine analysis of hotkey 
detection");
 
 // TODO: (Tangyanzhao) add a limit to avoid changing when detecting
-DSN_DEFINE_int32("pegasus.server",
-                 data_capture_hash_bucket_num,
-                 37,
-                 "the number of data capture hash buckets");
+DSN_DEFINE_uint32("pegasus.server",
+                  hotkey_buckets_num,
+                  37,
+                  "the number of data capture hash buckets");
 
-DSN_DEFINE_validator(data_capture_hash_bucket_num, [](int32_t bucket_num) -> 
bool {
+DSN_DEFINE_validator(hotkey_buckets_num, [](int32_t bucket_num) -> bool {
     if (bucket_num < 3) {
         return false;
     }
-    // data_capture_hash_bucket_num should be a prime number
+    // hotkey_buckets_num should be a prime number
     for (int i = 2; i <= bucket_num / i; i++) {
         if (bucket_num % i == 0) {
             return false;
@@ -101,7 +105,7 @@ find_outlier_index(const std::vector<uint64_t> 
&captured_keys, int threshold, in
 static int get_bucket_id(dsn::string_view data)
 {
     size_t hash_value = boost::hash_range(data.begin(), data.end());
-    return static_cast<int>(hash_value % FLAGS_data_capture_hash_bucket_num);
+    return static_cast<int>(hash_value % FLAGS_hotkey_buckets_num);
 }
 
 hotkey_collector::hotkey_collector(dsn::replication::hotkey_type::type 
hotkey_type,
@@ -226,7 +230,7 @@ bool hotkey_collector::terminate_if_timeout()
 }
 
 hotkey_coarse_data_collector::hotkey_coarse_data_collector(replica_base *base)
-    : internal_collector_base(base), 
_hash_buckets(FLAGS_data_capture_hash_bucket_num)
+    : internal_collector_base(base), _hash_buckets(FLAGS_hotkey_buckets_num)
 {
     for (auto &bucket : _hash_buckets) {
         bucket.store(0);
@@ -240,16 +244,93 @@ void hotkey_coarse_data_collector::capture_data(const 
dsn::blob &hash_key, uint6
 
 void hotkey_coarse_data_collector::analyse_data(detect_hotkey_result &result)
 {
-    std::vector<uint64_t> buckets(FLAGS_data_capture_hash_bucket_num);
+    std::vector<uint64_t> buckets(FLAGS_hotkey_buckets_num);
     for (int i = 0; i < buckets.size(); i++) {
         buckets[i] = _hash_buckets[i].load();
         _hash_buckets[i].store(0);
     }
     if (!find_outlier_index(
-            buckets, FLAGS_coarse_data_variance_threshold, 
result.coarse_bucket_index)) {
+            buckets, FLAGS_hot_bucket_variance_threshold, 
result.coarse_bucket_index)) {
         result.coarse_bucket_index = -1;
     }
 }
 
+hotkey_fine_data_collector::hotkey_fine_data_collector(replica_base *base,
+                                                       int target_bucket_index,
+                                                       int max_queue_size)
+    : internal_collector_base(base),
+      _max_queue_size(max_queue_size),
+      _target_bucket_index(target_bucket_index),
+      _capture_key_queue(max_queue_size)
+{
+}
+
+void hotkey_fine_data_collector::capture_data(const dsn::blob &hash_key, 
uint64_t weight)
+{
+    if (get_bucket_id(hash_key) != _target_bucket_index) {
+        return;
+    }
+    // abandon the key if enqueue failed (possibly because not enough room to 
enqueue)
+    _capture_key_queue.try_enqueue(std::make_pair(hash_key, weight));
+}
+
+struct blob_hash
+{
+    std::size_t operator()(const dsn::blob &str) const
+    {
+        dsn::string_view cp(str);
+        return boost::hash_range(cp.begin(), cp.end());
+    }
+};
+
+struct blob_equal
+{
+    std::size_t operator()(const dsn::blob &lhs, const dsn::blob &rhs) const
+    {
+        return dsn::string_view(lhs) == dsn::string_view(rhs);
+    }
+};
+
+void hotkey_fine_data_collector::analyse_data(detect_hotkey_result &result)
+{
+    // hashkey -> weight
+    std::unordered_map<dsn::blob, uint64_t, blob_hash, blob_equal> 
hash_keys_weight;
+    std::pair<dsn::blob, uint64_t> key_weight_pair;
+    // prevent endless loop, limit the number of elements analyzed not to 
exceed the queue size
+    uint32_t dequeue_cnt = 0;
+    while (++dequeue_cnt <= _max_queue_size && 
_capture_key_queue.try_dequeue(key_weight_pair)) {
+        hash_keys_weight[key_weight_pair.first] += key_weight_pair.second;
+    }
+
+    if (hash_keys_weight.empty()) {
+        return;
+    }
+
+    // the weight of all the collected hash keys
+    std::vector<uint64_t> weights;
+    weights.reserve(hash_keys_weight.size());
+    dsn::string_view weight_max_key; // the hashkey with the max weight
+    uint64_t weight_max = 0;         // the max weight by far
+    for (const auto &iter : hash_keys_weight) {
+        weights.push_back(iter.second);
+        if (iter.second > weight_max) {
+            weight_max = iter.second;
+            weight_max_key = iter.first;
+        }
+    }
+
+    // hash_key_counts stores the number of occurrences of each string 
captured in a period of time
+    // The size of weights influences our hotkey determination strategy
+    // weights.size() <= 2: the hotkey must exist (the most weighted key), 
because
+    //                      the two-level filtering significantly reduces the
+    //                      possibility that the hottest key is not the actual 
hotkey.
+    // weights.size() >= 3: use find_outlier_index to determine whether a 
hotkey exists
+    int hot_index;
+    if (weights.size() < 3 ||
+        find_outlier_index(weights, FLAGS_hot_key_variance_threshold, 
hot_index)) {
+        result.hot_hash_key = std::string(weight_max_key);
+    }
+}
+
 } // namespace server
 } // namespace pegasus
diff --git a/src/server/hotkey_collector.h b/src/server/hotkey_collector.h
index d4b43a4..937d07d 100644
--- a/src/server/hotkey_collector.h
+++ b/src/server/hotkey_collector.h
@@ -18,8 +18,9 @@
 #pragma once
 
 #include <dsn/dist/replication/replication_types.h>
-#include "hotkey_collector_state.h"
+#include <concurrentqueue/concurrentqueue.h>
 #include <dsn/dist/replication/replica_base.h>
+#include "hotkey_collector_state.h"
 
 namespace pegasus {
 namespace server {
@@ -29,6 +30,7 @@ class internal_collector_base;
 struct detect_hotkey_result
 {
     int coarse_bucket_index = -1;
+    std::string hot_hash_key;
 };
 
 //    hotkey_collector is responsible to find the hot keys after the partition
@@ -120,8 +122,26 @@ public:
     void analyse_data(detect_hotkey_result &result) override;
 
 private:
+    hotkey_coarse_data_collector() = delete;
+
     std::vector<std::atomic<uint64_t>> _hash_buckets;
 };
 
+class hotkey_fine_data_collector : public internal_collector_base
+{
+public:
+    hotkey_fine_data_collector(replica_base *base, int target_bucket_index, 
int max_queue_size);
+    void capture_data(const dsn::blob &hash_key, uint64_t weight) override;
+    void analyse_data(detect_hotkey_result &result) override;
+
+private:
+    hotkey_fine_data_collector() = delete;
+
+    const uint32_t _max_queue_size;
+    const uint32_t _target_bucket_index;
+    // ConcurrentQueue is a lock-free queue to capture keys
+    moodycamel::ConcurrentQueue<std::pair<dsn::blob, uint64_t>> 
_capture_key_queue;
+};
+
 } // namespace server
 } // namespace pegasus


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to