Smityz commented on a change in pull request #631:
URL: https://github.com/apache/incubator-pegasus/pull/631#discussion_r513904935



##########
File path: src/server/hotkey_collector.cpp
##########
@@ -251,5 +261,107 @@ void 
hotkey_coarse_data_collector::analyse_data(detect_hotkey_result &result)
     }
 }
 
+hotkey_fine_data_collector::hotkey_fine_data_collector(
+    replica_base *base,
+    dsn::replication::hotkey_type::type hotkey_type,
+    int target_bucket_index,
+    int max_queue_size)
+    : internal_collector_base(base),
+      _hotkey_type(hotkey_type),
+      _max_queue_size(max_queue_size),
+      _target_bucket_index(target_bucket_index)
+{
+    // Distinguish between single-threaded and multi-threaded environments
+    if (_hotkey_type == dsn::replication::hotkey_type::READ) {
+
+        auto threads = dsn::get_threadpool_threads_info(THREAD_POOL_LOCAL_APP);
+        int queue_num = threads.size();
+
+        _string_capture_queue_vec.reserve(queue_num);
+        for (int i = 0; i < queue_num; i++) {
+            _thread_queue_map.insert(std::make_pair(threads[i]->native_tid(), 
i));
+
+            // Create a vector of the ReaderWriterQueue whose size = 
_max_queue_size
+            _string_capture_queue_vec.emplace_back(_max_queue_size);
+        }
+
+    } else { // WRITE
+        _string_capture_queue_vec.emplace_back(_max_queue_size);
+    }
+}
+
+void hotkey_fine_data_collector::capture_data(const dsn::blob &hash_key, 
uint64_t weight)
+{
+    if (get_bucket_id(hash_key) != _target_bucket_index) {
+        return;
+    }
+    
_string_capture_queue_vec[get_queue_index()].try_emplace(std::make_pair(hash_key,
 weight));
+}
+
+struct blob_hash
+{
+    std::size_t operator()(const dsn::blob &str) const
+    {
+        dsn::string_view cp(str);
+        return boost::hash_range(cp.begin(), cp.end());
+    }
+};
+struct blob_equal
+{
+    std::size_t operator()(const dsn::blob &lhs, const dsn::blob &rhs) const
+    {
+        return dsn::string_view(lhs) == dsn::string_view(rhs);
+    }
+};
+
+int hotkey_fine_data_collector::get_queue_index()
+{
+    if (_hotkey_type == dsn::replication::hotkey_type::WRITE) {
+        return 0;
+    }
+
+    int thread_native_tid = dsn::utils::get_current_tid();
+    auto result = _thread_queue_map.find(thread_native_tid);
+    dassert(result != _thread_queue_map.end(), "Can't find the queue 
corresponding to the thread");
+    return result->second;
+}
+
+void hotkey_fine_data_collector::analyse_data(detect_hotkey_result &result)
+{
+    std::unordered_map<dsn::blob, uint64_t, blob_hash, blob_equal> 
hash_key_accessed_cnt;
+    for (auto &rw_queue : _string_capture_queue_vec) {
+        std::pair<dsn::blob, int> hash_key_pair;
+        // prevent endless loop
+        int collect_sum = 0;
+        while (rw_queue.try_dequeue(hash_key_pair) && ++collect_sum <= 
_max_queue_size) {
+            hash_key_accessed_cnt[hash_key_pair.first] += hash_key_pair.second;
+        }
+    }
+
+    if (hash_key_accessed_cnt.empty()) {
+        return;
+    }
+
+    std::vector<uint64_t> counts;
+    counts.reserve(FLAGS_data_capture_hash_bucket_num);
+    dsn::string_view count_max_key;
+    uint64_t count_max = 0;
+    for (const auto &iter : hash_key_accessed_cnt) {
+        counts.push_back(iter.second);
+        if (iter.second > count_max) {
+            count_max = iter.second;
+            count_max_key = iter.first; // the key with the max accessed count.
+        }
+    }
+
+    // if the accessed counts differ hugely (depends on the variance 
threshold),
+    // the max key is the hotkey.
+    int hot_index;
+    if (counts.size() < 3 ||
+        find_outlier_index(counts, FLAGS_fine_data_variance_threshold, 
hot_index)) {
+        result.hot_hash_key = std::string(count_max_key);
+    }

Review comment:
       Only depends on the hottest data is not accurate, we must ensure that 
it's an outlier in its hash bucket




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@pegasus.apache.org
For additional commands, e-mail: dev-h...@pegasus.apache.org

Reply via email to