This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new f2d850b  feat(hotkey): capture data part2 - declare coarse collector  
(#624)
f2d850b is described below

commit f2d850bfb242ea06d87b0ab0801f1504175f4d8c
Author: Smilencer <[email protected]>
AuthorDate: Tue Oct 27 14:34:44 2020 +0800

    feat(hotkey): capture data part2 - declare coarse collector  (#624)
---
 src/server/hotkey_collector.cpp | 108 ++++++++++++++++++++++++++++++++++++++--
 src/server/hotkey_collector.h   |  28 +++++++++--
 2 files changed, 129 insertions(+), 7 deletions(-)

diff --git a/src/server/hotkey_collector.cpp b/src/server/hotkey_collector.cpp
index 16984ee..2d81595 100644
--- a/src/server/hotkey_collector.cpp
+++ b/src/server/hotkey_collector.cpp
@@ -19,6 +19,8 @@
 
 #include <dsn/dist/replication/replication_enums.h>
 #include <dsn/utility/smart_pointers.h>
+#include <dsn/utility/flags.h>
+#include <boost/functional/hash.hpp>
 #include "base/pegasus_key_schema.h"
 #include <dsn/dist/fmt_logging.h>
 #include <dsn/utility/flags.h>
@@ -26,18 +28,88 @@
 namespace pegasus {
 namespace server {
 
+DSN_DEFINE_int32("pegasus.server",
+                 coarse_data_variance_threshold,
+                 3,
+                 "the threshold of variance calculate to find the outliers");
+
+DSN_DEFINE_validator(coarse_data_variance_threshold,
+                     [](int32_t threshold) -> bool { return (threshold >= 0); 
});
+
+// TODO: (Tangyanzhao) add a limit to avoid changing when detecting
+DSN_DEFINE_int32("pegasus.server",
+                 data_capture_hash_bucket_num,
+                 37,
+                 "the number of data capture hash buckets");
+
+DSN_DEFINE_validator(data_capture_hash_bucket_num, [](int32_t bucket_num) -> 
bool {
+    if (bucket_num < 3) {
+        return false;
+    }
+    // data_capture_hash_bucket_num should be a prime number
+    for (int i = 2; i <= bucket_num / i; i++) {
+        if (bucket_num % i == 0) {
+            return false;
+        }
+    }
+    return true;
+});
+
 DSN_DEFINE_int32(
     "pegasus.server",
     max_seconds_to_detect_hotkey,
     150,
     "the max time (in seconds) allowed to capture hotkey, will stop if 
hotkey's not found");
 
+// 68–95–99.7 rule, same algorithm as 
hotspot_partition_calculator::stat_histories_analyse
+static bool
+find_outlier_index(const std::vector<uint64_t> &captured_keys, int threshold, 
int &hot_index)
+{
+    dcheck_gt(captured_keys.size(), 2);
+    int data_size = captured_keys.size();
+    // empirical rule to calculate hot point of each partition
+    // same algorithm as hotspot_partition_calculator::stat_histories_analyse
+    double table_captured_key_sum = 0;
+    int hot_value = 0;
+    for (int i = 0; i < data_size; i++) {
+        table_captured_key_sum += captured_keys[i];
+        if (captured_keys[i] > hot_value) {
+            hot_index = i;
+            hot_value = captured_keys[i];
+        }
+    }
+    // TODO: (Tangyanzhao) increase a judgment of table_captured_key_sum
+    double captured_keys_avg_count =
+        (table_captured_key_sum - captured_keys[hot_index]) / (data_size - 1);
+    double standard_deviation = 0;
+    for (int i = 0; i < data_size; i++) {
+        if (i != hot_index) {
+            standard_deviation += pow((captured_keys[i] - 
captured_keys_avg_count), 2);
+        }
+    }
+    standard_deviation = sqrt(standard_deviation / (data_size - 2));
+    double hot_point = (hot_value - captured_keys_avg_count) / 
standard_deviation;
+    if (hot_point >= threshold) {
+        return true;
+    } else {
+        hot_index = -1;
+        return false;
+    }
+}
+
+// TODO: (Tangyanzhao) replace it to xxhash
+static int get_bucket_id(dsn::string_view data)
+{
+    size_t hash_value = boost::hash_range(data.begin(), data.end());
+    return static_cast<int>(hash_value % FLAGS_data_capture_hash_bucket_num);
+}
+
 hotkey_collector::hotkey_collector(dsn::replication::hotkey_type::type 
hotkey_type,
                                    dsn::replication::replica_base *r_base)
     : replica_base(r_base),
       _state(hotkey_collector_state::STOPPED),
       _hotkey_type(hotkey_type),
-      _internal_collector(std::make_shared<hotkey_empty_data_collector>()),
+      _internal_collector(std::make_shared<hotkey_empty_data_collector>(this)),
       _collector_start_time_second(0)
 {
 }
@@ -78,7 +150,11 @@ void hotkey_collector::analyse_data()
     switch (_state.load()) {
     case hotkey_collector_state::COARSE_DETECTING:
         if (!terminate_if_timeout()) {
-            _internal_collector->analyse_data();
+            _internal_collector->analyse_data(_result);
+            if (_result.coarse_bucket_index != -1) {
+                // TODO: (Tangyanzhao) reset _internal_collector to 
hotkey_fine_data_collector
+                _state.store(hotkey_collector_state::FINE_DETECTING);
+            }
         }
         return;
     default:
@@ -108,7 +184,7 @@ void 
hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response
         return;
     case hotkey_collector_state::STOPPED:
         _collector_start_time_second = dsn_now_s();
-        // TODO: (Tangyanzhao) start coarse detecting
+        _internal_collector.reset(new hotkey_coarse_data_collector(this));
         _state.store(hotkey_collector_state::COARSE_DETECTING);
         resp.err = dsn::ERR_OK;
         hint = fmt::format("starting to detect {} hotkey", 
dsn::enum_to_string(_hotkey_type));
@@ -149,5 +225,31 @@ bool hotkey_collector::terminate_if_timeout()
     return false;
 }
 
+hotkey_coarse_data_collector::hotkey_coarse_data_collector(replica_base *base)
+    : internal_collector_base(base), 
_hash_buckets(FLAGS_data_capture_hash_bucket_num)
+{
+    for (auto &bucket : _hash_buckets) {
+        bucket.store(0);
+    }
+}
+
+void hotkey_coarse_data_collector::capture_data(const dsn::blob &hash_key, 
uint64_t weight)
+{
+    _hash_buckets[get_bucket_id(hash_key)].fetch_add(weight);
+}
+
+void hotkey_coarse_data_collector::analyse_data(detect_hotkey_result &result)
+{
+    std::vector<uint64_t> buckets(FLAGS_data_capture_hash_bucket_num);
+    for (int i = 0; i < buckets.size(); i++) {
+        buckets[i] = _hash_buckets[i].load();
+        _hash_buckets[i].store(0);
+    }
+    if (!find_outlier_index(
+            buckets, FLAGS_coarse_data_variance_threshold, 
result.coarse_bucket_index)) {
+        result.coarse_bucket_index = -1;
+    }
+}
+
 } // namespace server
 } // namespace pegasus
diff --git a/src/server/hotkey_collector.h b/src/server/hotkey_collector.h
index 6cf34cc..d4b43a4 100644
--- a/src/server/hotkey_collector.h
+++ b/src/server/hotkey_collector.h
@@ -26,6 +26,11 @@ namespace server {
 
 class internal_collector_base;
 
+struct detect_hotkey_result
+{
+    int coarse_bucket_index = -1;
+};
+
 //    hotkey_collector is responsible to find the hot keys after the partition
 //    was detected to be hot. The two types of hotkey, READ & WRITE, are 
detected
 //    separately.
@@ -82,25 +87,40 @@ private:
     void terminate();
     bool terminate_if_timeout();
 
+    detect_hotkey_result _result;
     std::atomic<hotkey_collector_state> _state;
     const dsn::replication::hotkey_type::type _hotkey_type;
     std::shared_ptr<internal_collector_base> _internal_collector;
     uint64_t _collector_start_time_second;
 };
 
-class internal_collector_base
+class internal_collector_base : public dsn::replication::replica_base
 {
 public:
+    explicit internal_collector_base(replica_base *base) : 
replica_base(base){};
     virtual void capture_data(const dsn::blob &hash_key, uint64_t weight) = 0;
-    virtual void analyse_data() = 0;
+    virtual void analyse_data(detect_hotkey_result &result) = 0;
 };
 
 // used in hotkey_collector_state::STOPPED and 
hotkey_collector_state::FINISHED, avoid null pointers
 class hotkey_empty_data_collector : public internal_collector_base
 {
 public:
-    void capture_data(const dsn::blob &hash_key, uint64_t size) {}
-    void analyse_data() {}
+    explicit hotkey_empty_data_collector(replica_base *base) : 
internal_collector_base(base) {}
+    void capture_data(const dsn::blob &hash_key, uint64_t weight) override {}
+    void analyse_data(detect_hotkey_result &result) override {}
+};
+
+// TODO: (Tangyanzhao) add a unit test of hotkey_coarse_data_collector
+class hotkey_coarse_data_collector : public internal_collector_base
+{
+public:
+    explicit hotkey_coarse_data_collector(replica_base *base);
+    void capture_data(const dsn::blob &hash_key, uint64_t weight) override;
+    void analyse_data(detect_hotkey_result &result) override;
+
+private:
+    std::vector<std::atomic<uint64_t>> _hash_buckets;
 };
 
 } // namespace server


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to