hycdong commented on a change in pull request #592:
URL: https://github.com/apache/incubator-pegasus/pull/592#discussion_r487625557



##########
File path: src/server/hotspot_partition_calculator.h
##########
@@ -40,14 +47,14 @@ class hotspot_partition_calculator
 
 private:
     const std::string _app_name;
-
     void init_perf_counter(int perf_counter_count);
     // usually a partition with "hot-point value" >= 3 can be considered as a 
hotspot partition.
-    std::vector<dsn::perf_counter_wrapper> _hot_points;
+    hot_partition_counters _hot_points;
     // saving historical data can improve accuracy
-    std::queue<std::vector<hotspot_partition_data>> _partition_stat_histories;
+    stat_histories _partitions_stat_histories;
 
-    FRIEND_TEST(hotspot_partition_calculator, hotspot_partition_policy);
+    friend class hotspot_partition_test;
+    FRIEND_TEST(hotspot_partition_test, hotspot_partition_policy);

Review comment:
       Maybe `FRIEND_TEST(hotspot_partition_test, hotspot_partition_policy);` 
is useless, I notice that this case doesn't use private varieies.

##########
File path: src/shell/command_helper.h
##########
@@ -549,6 +549,14 @@ struct row_data
 
     double get_total_cu() const { return recent_read_cu + recent_write_cu; }
 
+    double get_total_read_qps() const { return get_qps + multi_get_qps + 
scan_qps; }
+
+    double get_total_write_qps() const

Review comment:
       Why total_write_qps doesn't include `incr_qps` and `duplicate_qps`?

##########
File path: src/server/hotspot_partition_calculator.cpp
##########
@@ -34,67 +35,84 @@ DSN_DEFINE_int64("pegasus.collector",
                  "eliminate outdated historical "
                  "data");
 
-void hotspot_partition_calculator::data_aggregate(const std::vector<row_data> 
&partitions)
+void hotspot_partition_calculator::data_aggregate(const std::vector<row_data> 
&partition_stats)
 {
-    while (_partition_stat_histories.size() > FLAGS_max_hotspot_store_size - 
1) {
-        _partition_stat_histories.pop();
+    while (_partitions_stat_histories.size() >= FLAGS_max_hotspot_store_size) {
+        _partitions_stat_histories.pop_front();
     }
-    std::vector<hotspot_partition_data> temp(partitions.size());
-    // TODO refactor the data structure
-    for (int i = 0; i < partitions.size(); i++) {
-        temp[i] = std::move(hotspot_partition_data(partitions[i]));
+    std::vector<hotspot_partition_stat> temp;
+    for (const auto &partition_stat : partition_stats) {
+        temp.emplace_back(hotspot_partition_stat(partition_stat));
     }
-    _partition_stat_histories.emplace(temp);
+    _partitions_stat_histories.emplace_back(temp);
 }
 
 void hotspot_partition_calculator::init_perf_counter(int partition_count)
 {
-    std::string counter_name;
-    std::string counter_desc;
+    std::string read_counter_name, write_counter_name;
+    std::string read_counter_desc, write_counter_desc;
     for (int i = 0; i < partition_count; i++) {
-        string partition_desc = _app_name + '.' + std::to_string(i);
-        counter_name = fmt::format("app.stat.hotspots@{}", partition_desc);
-        counter_desc = fmt::format("statistic the hotspots of app {}", 
partition_desc);
-        _hot_points[i].init_app_counter(
-            "app.pegasus", counter_name.c_str(), COUNTER_TYPE_NUMBER, 
counter_desc.c_str());
+        string read_partition_desc = _app_name + '.' + "read." + 
std::to_string(i);
+        read_counter_name = fmt::format("app.stat.hotspots@{}", 
read_partition_desc);
+        read_counter_desc = fmt::format("statistic the hotspots of app {}", 
read_partition_desc);
+        
_hot_points[i].emplace_back(std::make_unique<dsn::perf_counter_wrapper>());
+        _hot_points[i][READ_HOTSPOT_DATA]->init_app_counter("app.pegasus",
+                                                            
read_counter_name.c_str(),
+                                                            
COUNTER_TYPE_NUMBER,
+                                                            
read_counter_desc.c_str());
+        string write_partition_desc = _app_name + '.' + "write." + 
std::to_string(i);
+        write_counter_name = fmt::format("app.stat.hotspots@{}", 
write_partition_desc);
+        write_counter_desc = fmt::format("statistic the hotspots of app {}", 
write_partition_desc);
+        
_hot_points[i].emplace_back(std::make_unique<dsn::perf_counter_wrapper>());
+        _hot_points[i][WRITE_HOTSPOT_DATA]->init_app_counter("app.pegasus",
+                                                             
write_counter_name.c_str(),
+                                                             
COUNTER_TYPE_NUMBER,
+                                                             
write_counter_desc.c_str());
     }
 }
 
 void hotspot_partition_calculator::data_analyse()
 {
-    dassert(_partition_stat_histories.back().size() == _hot_points.size(),
-            "partition counts error, please check");
-    std::vector<double> data_samples;
-    data_samples.reserve(_partition_stat_histories.size() * 
_hot_points.size());
-    auto temp_data = _partition_stat_histories;
-    double table_qps_sum = 0, standard_deviation = 0, table_qps_avg = 0;
-    int sample_count = 0;
-    while (!temp_data.empty()) {
-        for (const auto &partition_data : temp_data.front()) {
-            if (partition_data.total_qps - 1.00 > 0) {
-                data_samples.push_back(partition_data.total_qps);
-                table_qps_sum += partition_data.total_qps;
-                sample_count++;
+    dcheck_eq(_partitions_stat_histories.back().size(), _hot_points.size());
+    for (int data_type = 0; data_type <= 1; data_type++) {
+        // 0: READ_HOTSPOT_DATA; 1: WRITE_HOTSPOT_DATA
+        double table_qps_sum = 0, standard_deviation = 0, table_qps_avg = 0;
+        int sample_count = 0;
+        for (const auto &one_partition_stat_histories : 
_partitions_stat_histories) {
+            for (const auto &partition_stat : one_partition_stat_histories) {
+                if (partition_stat.total_qps[data_type] > 1.00) {
+                    table_qps_sum += partition_stat.total_qps[data_type];
+                    sample_count++;
+                }
             }
         }
-        temp_data.pop();
-    }
-    if (sample_count == 0) {
-        ddebug("_partition_stat_histories size == 0");
-        return;
-    }
-    table_qps_avg = table_qps_sum / sample_count;
-    for (const auto &data_sample : data_samples) {
-        standard_deviation += pow((data_sample - table_qps_avg), 2);
-    }
-    standard_deviation = sqrt(standard_deviation / sample_count);
-    const auto &anly_data = _partition_stat_histories.back();
-    for (int i = 0; i < _hot_points.size(); i++) {
-        double hot_point = (anly_data[i].total_qps - table_qps_avg) / 
standard_deviation;
-        // perf_counter->set can only be unsigned __int64
-        // use ceil to guarantee conversion results
-        hot_point = ceil(std::max(hot_point, double(0)));
-        _hot_points[i]->set(hot_point);
+
+        if (sample_count <= 1) {
+            ddebug("_partitions_stat_histories size <= 1");
+            return;
+        }
+        table_qps_avg = table_qps_sum / sample_count;
+        for (const auto &one_partition_stat_histories : 
_partitions_stat_histories) {
+            for (const auto &partition_stat : one_partition_stat_histories) {
+                if (partition_stat.total_qps[data_type] > 1.00) {
+                    standard_deviation +=
+                        pow((partition_stat.total_qps[data_type] - 
table_qps_avg), 2);
+                }
+            }
+        }
+        standard_deviation = sqrt(standard_deviation / (sample_count - 1));
+        const auto &anly_data = _partitions_stat_histories.back();
+        for (int i = 0; i < _hot_points.size(); i++) {
+            double hot_point = 0;
+            if (standard_deviation != 0) {
+                hot_point =
+                    (anly_data[i].total_qps[data_type] - table_qps_avg) / 
standard_deviation;
+            }
+            // perf_counter->set can only be unsigned uint64_t
+            // use ceil to guarantee conversion results
+            hot_point = ceil(std::max(hot_point, double(0)));
+            _hot_points[i][data_type]->get()->set(hot_point);
+        }

Review comment:
       I think this `data_analyse` can be totally refactored.
   
   This function will firstly calculate qps statistics, then update hotpoint 
according to the `_partitions_stat_histories.back()`, you can seperate those 
two functions to make `data_analyse` clearer, such as:
   ```
   void hotspot_partition_calculator::data_analyse()
   {
        // your original dcheck_eq();
        for (int data_type = 0; data_type <= 1; data_type++) {
             double standard_deviation = 0, table_qps_avg = 0;
             if(get_hotpot_statistics(data_type, table_qps_avg, 
standard_deviation))
             {
                 update_hot_point(_partitions_stat_histories.back(), data_type, 
table_qps_avg, standard_deviation);
             }
        }
        
   }
   
   bool hotspot_partition_calculator::get_hotpot_statistics(partition_qps_type 
data_type, double &qps_avg, 
   double &standard_deviation)
   {
        // calculate qps_avg and standard_deviation
        // if `sample_count <= 1`, there is no need to calculate statistics, 
return false
        // otherwise return true 
   }
   
   void 
hotspot_partition_calculator::update_hot_point(std::vector<hotspot_partition_stat>
 cur_stat, 
   partition_qps_type data_type, double qps_avg, double standard_deviation)
   {
        // update hot points
   }
   ```
   This is a simple refaction suggestion, you can also raise your better 
solution to make code clearer.
   
   

##########
File path: src/server/hotspot_partition_calculator.cpp
##########
@@ -34,67 +35,84 @@ DSN_DEFINE_int64("pegasus.collector",
                  "eliminate outdated historical "
                  "data");
 
-void hotspot_partition_calculator::data_aggregate(const std::vector<row_data> 
&partitions)
+void hotspot_partition_calculator::data_aggregate(const std::vector<row_data> 
&partition_stats)
 {
-    while (_partition_stat_histories.size() > FLAGS_max_hotspot_store_size - 
1) {
-        _partition_stat_histories.pop();
+    while (_partitions_stat_histories.size() >= FLAGS_max_hotspot_store_size) {
+        _partitions_stat_histories.pop_front();
     }
-    std::vector<hotspot_partition_data> temp(partitions.size());
-    // TODO refactor the data structure
-    for (int i = 0; i < partitions.size(); i++) {
-        temp[i] = std::move(hotspot_partition_data(partitions[i]));
+    std::vector<hotspot_partition_stat> temp;
+    for (const auto &partition_stat : partition_stats) {
+        temp.emplace_back(hotspot_partition_stat(partition_stat));
     }
-    _partition_stat_histories.emplace(temp);
+    _partitions_stat_histories.emplace_back(temp);
 }
 
 void hotspot_partition_calculator::init_perf_counter(int partition_count)
 {
-    std::string counter_name;
-    std::string counter_desc;
+    std::string read_counter_name, write_counter_name;
+    std::string read_counter_desc, write_counter_desc;
     for (int i = 0; i < partition_count; i++) {
-        string partition_desc = _app_name + '.' + std::to_string(i);
-        counter_name = fmt::format("app.stat.hotspots@{}", partition_desc);
-        counter_desc = fmt::format("statistic the hotspots of app {}", 
partition_desc);
-        _hot_points[i].init_app_counter(
-            "app.pegasus", counter_name.c_str(), COUNTER_TYPE_NUMBER, 
counter_desc.c_str());
+        string read_partition_desc = _app_name + '.' + "read." + 
std::to_string(i);
+        read_counter_name = fmt::format("app.stat.hotspots@{}", 
read_partition_desc);
+        read_counter_desc = fmt::format("statistic the hotspots of app {}", 
read_partition_desc);
+        
_hot_points[i].emplace_back(std::make_unique<dsn::perf_counter_wrapper>());
+        _hot_points[i][READ_HOTSPOT_DATA]->init_app_counter("app.pegasus",
+                                                            
read_counter_name.c_str(),
+                                                            
COUNTER_TYPE_NUMBER,
+                                                            
read_counter_desc.c_str());
+        string write_partition_desc = _app_name + '.' + "write." + 
std::to_string(i);
+        write_counter_name = fmt::format("app.stat.hotspots@{}", 
write_partition_desc);
+        write_counter_desc = fmt::format("statistic the hotspots of app {}", 
write_partition_desc);
+        
_hot_points[i].emplace_back(std::make_unique<dsn::perf_counter_wrapper>());
+        _hot_points[i][WRITE_HOTSPOT_DATA]->init_app_counter("app.pegasus",
+                                                             
write_counter_name.c_str(),
+                                                             
COUNTER_TYPE_NUMBER,
+                                                             
write_counter_desc.c_str());
     }
 }
 
 void hotspot_partition_calculator::data_analyse()
 {
-    dassert(_partition_stat_histories.back().size() == _hot_points.size(),
-            "partition counts error, please check");
-    std::vector<double> data_samples;
-    data_samples.reserve(_partition_stat_histories.size() * 
_hot_points.size());
-    auto temp_data = _partition_stat_histories;
-    double table_qps_sum = 0, standard_deviation = 0, table_qps_avg = 0;
-    int sample_count = 0;
-    while (!temp_data.empty()) {
-        for (const auto &partition_data : temp_data.front()) {
-            if (partition_data.total_qps - 1.00 > 0) {
-                data_samples.push_back(partition_data.total_qps);
-                table_qps_sum += partition_data.total_qps;
-                sample_count++;
+    dcheck_eq(_partitions_stat_histories.back().size(), _hot_points.size());
+    for (int data_type = 0; data_type <= 1; data_type++) {
+        // 0: READ_HOTSPOT_DATA; 1: WRITE_HOTSPOT_DATA
+        double table_qps_sum = 0, standard_deviation = 0, table_qps_avg = 0;
+        int sample_count = 0;
+        for (const auto &one_partition_stat_histories : 
_partitions_stat_histories) {
+            for (const auto &partition_stat : one_partition_stat_histories) {
+                if (partition_stat.total_qps[data_type] > 1.00) {
+                    table_qps_sum += partition_stat.total_qps[data_type];
+                    sample_count++;
+                }
             }
         }
-        temp_data.pop();
-    }
-    if (sample_count == 0) {
-        ddebug("_partition_stat_histories size == 0");
-        return;
-    }
-    table_qps_avg = table_qps_sum / sample_count;
-    for (const auto &data_sample : data_samples) {
-        standard_deviation += pow((data_sample - table_qps_avg), 2);
-    }
-    standard_deviation = sqrt(standard_deviation / sample_count);
-    const auto &anly_data = _partition_stat_histories.back();
-    for (int i = 0; i < _hot_points.size(); i++) {
-        double hot_point = (anly_data[i].total_qps - table_qps_avg) / 
standard_deviation;
-        // perf_counter->set can only be unsigned __int64
-        // use ceil to guarantee conversion results
-        hot_point = ceil(std::max(hot_point, double(0)));
-        _hot_points[i]->set(hot_point);
+
+        if (sample_count <= 1) {
+            ddebug("_partitions_stat_histories size <= 1");

Review comment:
       This log is a little bit simple, it cann't show why it should return 
here.

##########
File path: src/server/hotspot_partition_calculator.cpp
##########
@@ -34,67 +35,84 @@ DSN_DEFINE_int64("pegasus.collector",
                  "eliminate outdated historical "
                  "data");
 
-void hotspot_partition_calculator::data_aggregate(const std::vector<row_data> 
&partitions)
+void hotspot_partition_calculator::data_aggregate(const std::vector<row_data> 
&partition_stats)
 {
-    while (_partition_stat_histories.size() > FLAGS_max_hotspot_store_size - 
1) {
-        _partition_stat_histories.pop();
+    while (_partitions_stat_histories.size() >= FLAGS_max_hotspot_store_size) {
+        _partitions_stat_histories.pop_front();
     }
-    std::vector<hotspot_partition_data> temp(partitions.size());
-    // TODO refactor the data structure
-    for (int i = 0; i < partitions.size(); i++) {
-        temp[i] = std::move(hotspot_partition_data(partitions[i]));
+    std::vector<hotspot_partition_stat> temp;
+    for (const auto &partition_stat : partition_stats) {
+        temp.emplace_back(hotspot_partition_stat(partition_stat));
     }
-    _partition_stat_histories.emplace(temp);
+    _partitions_stat_histories.emplace_back(temp);
 }
 
 void hotspot_partition_calculator::init_perf_counter(int partition_count)
 {
-    std::string counter_name;
-    std::string counter_desc;
+    std::string read_counter_name, write_counter_name;
+    std::string read_counter_desc, write_counter_desc;
     for (int i = 0; i < partition_count; i++) {
-        string partition_desc = _app_name + '.' + std::to_string(i);
-        counter_name = fmt::format("app.stat.hotspots@{}", partition_desc);
-        counter_desc = fmt::format("statistic the hotspots of app {}", 
partition_desc);
-        _hot_points[i].init_app_counter(
-            "app.pegasus", counter_name.c_str(), COUNTER_TYPE_NUMBER, 
counter_desc.c_str());
+        string read_partition_desc = _app_name + '.' + "read." + 
std::to_string(i);
+        read_counter_name = fmt::format("app.stat.hotspots@{}", 
read_partition_desc);
+        read_counter_desc = fmt::format("statistic the hotspots of app {}", 
read_partition_desc);
+        
_hot_points[i].emplace_back(std::make_unique<dsn::perf_counter_wrapper>());
+        _hot_points[i][READ_HOTSPOT_DATA]->init_app_counter("app.pegasus",
+                                                            
read_counter_name.c_str(),
+                                                            
COUNTER_TYPE_NUMBER,
+                                                            
read_counter_desc.c_str());
+        string write_partition_desc = _app_name + '.' + "write." + 
std::to_string(i);
+        write_counter_name = fmt::format("app.stat.hotspots@{}", 
write_partition_desc);
+        write_counter_desc = fmt::format("statistic the hotspots of app {}", 
write_partition_desc);
+        
_hot_points[i].emplace_back(std::make_unique<dsn::perf_counter_wrapper>());
+        _hot_points[i][WRITE_HOTSPOT_DATA]->init_app_counter("app.pegasus",
+                                                             
write_counter_name.c_str(),
+                                                             
COUNTER_TYPE_NUMBER,
+                                                             
write_counter_desc.c_str());
     }
 }
 
 void hotspot_partition_calculator::data_analyse()
 {
-    dassert(_partition_stat_histories.back().size() == _hot_points.size(),
-            "partition counts error, please check");
-    std::vector<double> data_samples;
-    data_samples.reserve(_partition_stat_histories.size() * 
_hot_points.size());
-    auto temp_data = _partition_stat_histories;
-    double table_qps_sum = 0, standard_deviation = 0, table_qps_avg = 0;
-    int sample_count = 0;
-    while (!temp_data.empty()) {
-        for (const auto &partition_data : temp_data.front()) {
-            if (partition_data.total_qps - 1.00 > 0) {
-                data_samples.push_back(partition_data.total_qps);
-                table_qps_sum += partition_data.total_qps;
-                sample_count++;
+    dcheck_eq(_partitions_stat_histories.back().size(), _hot_points.size());

Review comment:
       Are `_partitions_stat_histories.back().size()` and `_hot_points.size()` 
both expected to be equal to app partition_count? Besides add some log for this 
assert.

##########
File path: src/server/hotspot_partition_stat.h
##########
@@ -0,0 +1,30 @@
+// Copyright (c) 2017, Xiaomi, Inc.  All rights reserved.
+// This source code is licensed under the Apache License Version 2.0, which
+// can be found in the LICENSE file in the root directory of this source tree.

Review comment:
       Update license

##########
File path: src/server/hotspot_partition_calculator.cpp
##########
@@ -34,67 +35,84 @@ DSN_DEFINE_int64("pegasus.collector",
                  "eliminate outdated historical "
                  "data");
 
-void hotspot_partition_calculator::data_aggregate(const std::vector<row_data> 
&partitions)
+void hotspot_partition_calculator::data_aggregate(const std::vector<row_data> 
&partition_stats)
 {
-    while (_partition_stat_histories.size() > FLAGS_max_hotspot_store_size - 
1) {
-        _partition_stat_histories.pop();
+    while (_partitions_stat_histories.size() >= FLAGS_max_hotspot_store_size) {
+        _partitions_stat_histories.pop_front();
     }
-    std::vector<hotspot_partition_data> temp(partitions.size());
-    // TODO refactor the data structure
-    for (int i = 0; i < partitions.size(); i++) {
-        temp[i] = std::move(hotspot_partition_data(partitions[i]));
+    std::vector<hotspot_partition_stat> temp;
+    for (const auto &partition_stat : partition_stats) {
+        temp.emplace_back(hotspot_partition_stat(partition_stat));

Review comment:
       Will `partition_stats` size always be equal to app partition_count?

##########
File path: src/server/hotspot_partition_calculator.h
##########
@@ -40,14 +47,14 @@ class hotspot_partition_calculator
 
 private:
     const std::string _app_name;
-
     void init_perf_counter(int perf_counter_count);
     // usually a partition with "hot-point value" >= 3 can be considered as a 
hotspot partition.

Review comment:
       I don't understand this comment, could you please expain it to me?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to