hycdong commented on a change in pull request #631: URL: https://github.com/apache/incubator-pegasus/pull/631#discussion_r513891592
########## File path: src/server/hotkey_collector.h ########## @@ -123,5 +126,30 @@ class hotkey_coarse_data_collector : public internal_collector_base std::vector<std::atomic<uint64_t>> _hash_buckets; }; +typedef std::vector<moodycamel::ReaderWriterQueue<std::pair<dsn::blob, uint64_t>>> + string_capture_queue_vec; + +class hotkey_fine_data_collector : public internal_collector_base +{ +public: + hotkey_fine_data_collector() = delete; + explicit hotkey_fine_data_collector(replica_base *base, + dsn::replication::hotkey_type::type hotkey_type, + int target_bucket_index, + int max_queue_size); + void capture_data(const dsn::blob &hash_key, uint64_t weight) override; + void analyse_data(detect_hotkey_result &result) override; + +private: + inline int get_queue_index(); + + const dsn::replication::hotkey_type::type _hotkey_type; + int _max_queue_size; + const int _target_bucket_index; + // thread's native id -> data queue id. + std::unordered_map<int, int> _thread_queue_map; + string_capture_queue_vec _string_capture_queue_vec; Review comment: I suggest adding some comments for this structure. ########## File path: src/server/hotkey_collector.cpp ########## @@ -251,5 +261,107 @@ void hotkey_coarse_data_collector::analyse_data(detect_hotkey_result &result) } } +hotkey_fine_data_collector::hotkey_fine_data_collector( + replica_base *base, + dsn::replication::hotkey_type::type hotkey_type, + int target_bucket_index, + int max_queue_size) + : internal_collector_base(base), + _hotkey_type(hotkey_type), + _max_queue_size(max_queue_size), + _target_bucket_index(target_bucket_index) +{ + // Distinguish between single-threaded and multi-threaded environments + if (_hotkey_type == dsn::replication::hotkey_type::READ) { + + auto threads = dsn::get_threadpool_threads_info(THREAD_POOL_LOCAL_APP); + int queue_num = threads.size(); + + _string_capture_queue_vec.reserve(queue_num); + for (int i = 0; i < queue_num; i++) { + _thread_queue_map.insert(std::make_pair(threads[i]->native_tid(), i)); + + // Create a vector of the ReaderWriterQueue whose size = _max_queue_size + _string_capture_queue_vec.emplace_back(_max_queue_size); + } + + } else { // WRITE + _string_capture_queue_vec.emplace_back(_max_queue_size); + } +} + +void hotkey_fine_data_collector::capture_data(const dsn::blob &hash_key, uint64_t weight) +{ + if (get_bucket_id(hash_key) != _target_bucket_index) { + return; + } + _string_capture_queue_vec[get_queue_index()].try_emplace(std::make_pair(hash_key, weight)); +} + +struct blob_hash +{ + std::size_t operator()(const dsn::blob &str) const + { + dsn::string_view cp(str); + return boost::hash_range(cp.begin(), cp.end()); + } +}; +struct blob_equal +{ + std::size_t operator()(const dsn::blob &lhs, const dsn::blob &rhs) const + { + return dsn::string_view(lhs) == dsn::string_view(rhs); + } +}; + +int hotkey_fine_data_collector::get_queue_index() +{ + if (_hotkey_type == dsn::replication::hotkey_type::WRITE) { + return 0; + } + + int thread_native_tid = dsn::utils::get_current_tid(); + auto result = _thread_queue_map.find(thread_native_tid); + dassert(result != _thread_queue_map.end(), "Can't find the queue corresponding to the thread"); + return result->second; +} + +void hotkey_fine_data_collector::analyse_data(detect_hotkey_result &result) +{ + std::unordered_map<dsn::blob, uint64_t, blob_hash, blob_equal> hash_key_accessed_cnt; + for (auto &rw_queue : _string_capture_queue_vec) { + std::pair<dsn::blob, int> hash_key_pair; + // prevent endless loop + int collect_sum = 0; + while (rw_queue.try_dequeue(hash_key_pair) && ++collect_sum <= _max_queue_size) { + hash_key_accessed_cnt[hash_key_pair.first] += hash_key_pair.second; + } + } Review comment: I just don't understand how to calculate haskkey_access_count. ########## File path: src/server/hotkey_collector.cpp ########## @@ -251,5 +261,107 @@ void hotkey_coarse_data_collector::analyse_data(detect_hotkey_result &result) } } +hotkey_fine_data_collector::hotkey_fine_data_collector( + replica_base *base, + dsn::replication::hotkey_type::type hotkey_type, + int target_bucket_index, + int max_queue_size) + : internal_collector_base(base), + _hotkey_type(hotkey_type), + _max_queue_size(max_queue_size), + _target_bucket_index(target_bucket_index) +{ + // Distinguish between single-threaded and multi-threaded environments + if (_hotkey_type == dsn::replication::hotkey_type::READ) { + + auto threads = dsn::get_threadpool_threads_info(THREAD_POOL_LOCAL_APP); + int queue_num = threads.size(); + + _string_capture_queue_vec.reserve(queue_num); + for (int i = 0; i < queue_num; i++) { + _thread_queue_map.insert(std::make_pair(threads[i]->native_tid(), i)); + + // Create a vector of the ReaderWriterQueue whose size = _max_queue_size + _string_capture_queue_vec.emplace_back(_max_queue_size); + } + + } else { // WRITE + _string_capture_queue_vec.emplace_back(_max_queue_size); + } +} + +void hotkey_fine_data_collector::capture_data(const dsn::blob &hash_key, uint64_t weight) +{ + if (get_bucket_id(hash_key) != _target_bucket_index) { + return; + } + _string_capture_queue_vec[get_queue_index()].try_emplace(std::make_pair(hash_key, weight)); +} + +struct blob_hash +{ + std::size_t operator()(const dsn::blob &str) const + { + dsn::string_view cp(str); + return boost::hash_range(cp.begin(), cp.end()); + } +}; +struct blob_equal +{ + std::size_t operator()(const dsn::blob &lhs, const dsn::blob &rhs) const + { + return dsn::string_view(lhs) == dsn::string_view(rhs); + } +}; + +int hotkey_fine_data_collector::get_queue_index() +{ + if (_hotkey_type == dsn::replication::hotkey_type::WRITE) { + return 0; + } + + int thread_native_tid = dsn::utils::get_current_tid(); + auto result = _thread_queue_map.find(thread_native_tid); + dassert(result != _thread_queue_map.end(), "Can't find the queue corresponding to the thread"); + return result->second; +} + +void hotkey_fine_data_collector::analyse_data(detect_hotkey_result &result) +{ + std::unordered_map<dsn::blob, uint64_t, blob_hash, blob_equal> hash_key_accessed_cnt; + for (auto &rw_queue : _string_capture_queue_vec) { + std::pair<dsn::blob, int> hash_key_pair; + // prevent endless loop + int collect_sum = 0; + while (rw_queue.try_dequeue(hash_key_pair) && ++collect_sum <= _max_queue_size) { + hash_key_accessed_cnt[hash_key_pair.first] += hash_key_pair.second; + } + } + + if (hash_key_accessed_cnt.empty()) { + return; + } + + std::vector<uint64_t> counts; + counts.reserve(FLAGS_data_capture_hash_bucket_num); + dsn::string_view count_max_key; + uint64_t count_max = 0; + for (const auto &iter : hash_key_accessed_cnt) { + counts.push_back(iter.second); + if (iter.second > count_max) { + count_max = iter.second; + count_max_key = iter.first; // the key with the max accessed count. + } + } + + // if the accessed counts differ hugely (depends on the variance threshold), + // the max key is the hotkey. + int hot_index; + if (counts.size() < 3 || + find_outlier_index(counts, FLAGS_fine_data_variance_threshold, hot_index)) { + result.hot_hash_key = std::string(count_max_key); + } Review comment: What is the code from L357-L363 doing? In my view, the hot_hash_key is `count_max_key`, which is be found in L353. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@pegasus.apache.org For additional commands, e-mail: dev-h...@pegasus.apache.org