This is an automated email from the ASF dual-hosted git repository.
wutao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new b057848 refactor(collector): sort out the structure of partition
hotspot detection (#597)
b057848 is described below
commit b05784823e5887babd3489fc4ef9c7d7d5d8257c
Author: Smilencer <[email protected]>
AuthorDate: Thu Sep 10 17:59:48 2020 +0800
refactor(collector): sort out the structure of partition hotspot detection
(#597)
---
src/server/hotspot_partition_calculator.cpp | 102 ++++++++++++++++++++++++++
src/server/hotspot_partition_calculator.h | 54 ++++++++++++++
src/server/hotspot_partition_data.h | 9 +--
src/server/info_collector.cpp | 39 +++-------
src/server/info_collector.h | 13 ++--
src/server/table_hotspot_policy.cpp | 40 ----------
src/server/table_hotspot_policy.h | 100 -------------------------
src/server/test/CMakeLists.txt | 2 +-
src/server/test/hotspot_partition_test.cpp | 49 +++++++++++++
src/server/test/pegasus_tablehotspot_test.cpp | 36 ---------
10 files changed, 225 insertions(+), 219 deletions(-)
diff --git a/src/server/hotspot_partition_calculator.cpp
b/src/server/hotspot_partition_calculator.cpp
new file mode 100644
index 0000000..865814e
--- /dev/null
+++ b/src/server/hotspot_partition_calculator.cpp
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "hotspot_partition_calculator.h"
+
+#include <algorithm>
+#include <math.h>
+#include <dsn/dist/fmt_logging.h>
+#include <dsn/utility/flags.h>
+
+namespace pegasus {
+namespace server {
+
+DSN_DEFINE_int64("pegasus.collector",
+ max_hotspot_store_size,
+ 100,
+ "the max count of historical data "
+ "stored in calculator, The FIFO "
+ "queue design is used to "
+ "eliminate outdated historical "
+ "data");
+
+void hotspot_partition_calculator::data_aggregate(const std::vector<row_data>
&partitions)
+{
+ while (_partition_stat_histories.size() > FLAGS_max_hotspot_store_size -
1) {
+ _partition_stat_histories.pop();
+ }
+ std::vector<hotspot_partition_data> temp(partitions.size());
+ // TODO refactor the data structure
+ for (int i = 0; i < partitions.size(); i++) {
+ temp[i] = std::move(hotspot_partition_data(partitions[i]));
+ }
+ _partition_stat_histories.emplace(temp);
+}
+
+void hotspot_partition_calculator::init_perf_counter(int partition_count)
+{
+ std::string counter_name;
+ std::string counter_desc;
+ for (int i = 0; i < partition_count; i++) {
+ string partition_desc = _app_name + '.' + std::to_string(i);
+ counter_name = fmt::format("app.stat.hotspots@{}", partition_desc);
+ counter_desc = fmt::format("statistic the hotspots of app {}",
partition_desc);
+ _hot_points[i].init_app_counter(
+ "app.pegasus", counter_name.c_str(), COUNTER_TYPE_NUMBER,
counter_desc.c_str());
+ }
+}
+
+void hotspot_partition_calculator::data_analyse()
+{
+ dassert(_partition_stat_histories.back().size() == _hot_points.size(),
+ "partition counts error, please check");
+ std::vector<double> data_samples;
+ data_samples.reserve(_partition_stat_histories.size() *
_hot_points.size());
+ auto temp_data = _partition_stat_histories;
+ double table_qps_sum = 0, standard_deviation = 0, table_qps_avg = 0;
+ int sample_count = 0;
+ while (!temp_data.empty()) {
+ for (const auto &partition_data : temp_data.front()) {
+ if (partition_data.total_qps - 1.00 > 0) {
+ data_samples.push_back(partition_data.total_qps);
+ table_qps_sum += partition_data.total_qps;
+ sample_count++;
+ }
+ }
+ temp_data.pop();
+ }
+ if (sample_count == 0) {
+ ddebug("_partition_stat_histories size == 0");
+ return;
+ }
+ table_qps_avg = table_qps_sum / sample_count;
+ for (const auto &data_sample : data_samples) {
+ standard_deviation += pow((data_sample - table_qps_avg), 2);
+ }
+ standard_deviation = sqrt(standard_deviation / sample_count);
+ const auto &anly_data = _partition_stat_histories.back();
+ for (int i = 0; i < _hot_points.size(); i++) {
+ double hot_point = (anly_data[i].total_qps - table_qps_avg) /
standard_deviation;
+ // perf_counter->set can only be unsigned __int64
+ // use ceil to guarantee conversion results
+ hot_point = ceil(std::max(hot_point, double(0)));
+ _hot_points[i]->set(hot_point);
+ }
+}
+
+} // namespace server
+} // namespace pegasus
diff --git a/src/server/hotspot_partition_calculator.h
b/src/server/hotspot_partition_calculator.h
new file mode 100644
index 0000000..c950ebe
--- /dev/null
+++ b/src/server/hotspot_partition_calculator.h
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "hotspot_partition_data.h"
+#include <gtest/gtest_prod.h>
+#include <dsn/perf_counter/perf_counter.h>
+
+namespace pegasus {
+namespace server {
+
+// hotspot_partition_calculator is used to find the hot partition in a table.
+class hotspot_partition_calculator
+{
+public:
+ hotspot_partition_calculator(const std::string &app_name, int
partition_count)
+ : _app_name(app_name), _hot_points(partition_count)
+ {
+ init_perf_counter(partition_count);
+ }
+ // aggregate related data of hotspot detection
+ void data_aggregate(const std::vector<row_data> &partitions);
+ // analyse the saved data to find hotspot partition
+ void data_analyse();
+
+private:
+ const std::string _app_name;
+
+ void init_perf_counter(int perf_counter_count);
+ // usually a partition with "hot-point value" >= 3 can be considered as a
hotspot partition.
+ std::vector<dsn::perf_counter_wrapper> _hot_points;
+ // saving historical data can improve accuracy
+ std::queue<std::vector<hotspot_partition_data>> _partition_stat_histories;
+
+ FRIEND_TEST(hotspot_partition_calculator, hotspot_partition_policy);
+};
+
+} // namespace server
+} // namespace pegasus
diff --git a/src/server/hotspot_partition_data.h
b/src/server/hotspot_partition_data.h
index 2f97c9b..b41b1b1 100644
--- a/src/server/hotspot_partition_data.h
+++ b/src/server/hotspot_partition_data.h
@@ -4,21 +4,16 @@
#pragma once
-#include "shell/commands.h"
+#include "shell/command_helper.h"
namespace pegasus {
namespace server {
struct hotspot_partition_data
{
- hotspot_partition_data(const row_data &row)
- : total_qps(row.get_total_qps()),
- total_cu(row.get_total_cu()),
- partition_name(row.row_name){};
+ hotspot_partition_data(const row_data &row) :
total_qps(row.get_total_qps()){};
hotspot_partition_data() {}
double total_qps;
- double total_cu;
- std::string partition_name;
};
} // namespace server
diff --git a/src/server/info_collector.cpp b/src/server/info_collector.cpp
index 6caea49..0ce2539 100644
--- a/src/server/info_collector.cpp
+++ b/src/server/info_collector.cpp
@@ -14,6 +14,7 @@
#include "base/pegasus_const.h"
#include "result_writer.h"
+#include "hotspot_partition_calculator.h"
using namespace ::dsn;
using namespace ::dsn::replication;
@@ -78,10 +79,6 @@ info_collector::info_collector()
"storage_size_fetch_interval_seconds",
3600, // default value 1h
"storage size fetch interval
seconds");
- _hotspot_detect_algorithm =
dsn_config_get_value_string("pegasus.collector",
-
"hotspot_detect_algorithm",
-
"hotspot_algo_qps_variance",
-
"hotspot_detect_algorithm");
// _storage_size_retry_wait_seconds is in range of [1, 60]
_storage_size_retry_wait_seconds =
std::min(60u, std::max(1u, _storage_size_fetch_interval_seconds / 10));
@@ -96,9 +93,6 @@ info_collector::~info_collector()
for (auto kv : _app_stat_counters) {
delete kv.second;
}
- for (auto store : _hotspot_calculator_store) {
- delete store.second;
- }
}
void info_collector::start()
@@ -150,15 +144,11 @@ void info_collector::on_app_stat()
// get row data statistics for all of the apps
all_stats.merge(app_stats);
- // hotspot_calculator is to detect hotspots
- hotspot_calculator *hotspot_calculator =
+ // hotspot_partition_calculator is used for detecting hotspots
+ auto hotspot_partition_calculator =
get_hotspot_calculator(app_rows.first, app_rows.second.size());
- if (!hotspot_calculator) {
- continue;
- }
- hotspot_calculator->aggregate(app_rows.second);
- // new policy can be designed by strategy pattern in
hotspot_partition_data.h
- hotspot_calculator->start_alg();
+ hotspot_partition_calculator->data_aggregate(app_rows.second);
+ hotspot_partition_calculator->data_analyse();
}
get_app_counters(all_stats.app_name)->set(all_stats);
@@ -302,25 +292,16 @@ void info_collector::on_storage_size_stat(int
remaining_retry_count)
_result_writer->set_result(st_stat.timestamp, "ss",
st_stat.dump_to_json());
}
-hotspot_calculator *info_collector::get_hotspot_calculator(const std::string
&app_name,
- const int
partition_num)
+std::shared_ptr<hotspot_partition_calculator>
+info_collector::get_hotspot_calculator(const std::string &app_name, const int
partition_count)
{
- // use appname+partition_num as a key can prevent the impact of dynamic
partition changes
- std::string app_name_pcount = fmt::format("{}.{}", app_name,
partition_num);
+ // use app_name+partition_count as a key can prevent the impact of dynamic
partition changes
+ std::string app_name_pcount = fmt::format("{}.{}", app_name,
partition_count);
auto iter = _hotspot_calculator_store.find(app_name_pcount);
if (iter != _hotspot_calculator_store.end()) {
return iter->second;
}
- std::unique_ptr<hotspot_policy> policy;
- if (_hotspot_detect_algorithm == "hotspot_algo_qps_variance") {
- policy.reset(new hotspot_algo_qps_variance());
- } else {
- dwarn("hotspot detection is disabled");
- _hotspot_calculator_store[app_name_pcount] = nullptr;
- return nullptr;
- }
- hotspot_calculator *calculator =
- new hotspot_calculator(app_name, partition_num, std::move(policy));
+ auto calculator = std::make_shared<hotspot_partition_calculator>(app_name,
partition_count);
_hotspot_calculator_store[app_name_pcount] = calculator;
return calculator;
}
diff --git a/src/server/info_collector.h b/src/server/info_collector.h
index 05d9929..adc3cd9 100644
--- a/src/server/info_collector.h
+++ b/src/server/info_collector.h
@@ -19,12 +19,12 @@
#include "../shell/commands.h"
#include "table_stats.h"
-#include "table_hotspot_policy.h"
namespace pegasus {
namespace server {
class result_writer;
+class hotspot_partition_calculator;
class info_collector
{
@@ -177,15 +177,16 @@ private:
uint32_t _storage_size_fetch_interval_seconds;
uint32_t _storage_size_retry_wait_seconds;
uint32_t _storage_size_retry_max_count;
- std::string _hotspot_detect_algorithm;
::dsn::task_ptr _storage_size_stat_timer_task;
::dsn::utils::ex_lock_nr _capacity_unit_update_info_lock;
// mapping 'node address' --> 'last updated timestamp'
std::map<std::string, string> _capacity_unit_update_info;
- std::map<std::string, hotspot_calculator *> _hotspot_calculator_store;
-
- hotspot_calculator *get_hotspot_calculator(const std::string &app_name,
- const int partition_num);
+ // _hotspot_calculator_store is to save hotspot_partition_calculator for
each table, a
+ // hotspot_partition_calculator saves historical hotspot data and alert
perf_counters of
+ // corresponding table
+ std::map<std::string, std::shared_ptr<hotspot_partition_calculator>>
_hotspot_calculator_store;
+ std::shared_ptr<hotspot_partition_calculator>
+ get_hotspot_calculator(const std::string &app_name, const int
partition_count);
};
} // namespace server
diff --git a/src/server/table_hotspot_policy.cpp
b/src/server/table_hotspot_policy.cpp
deleted file mode 100644
index ce9cd8d..0000000
--- a/src/server/table_hotspot_policy.cpp
+++ /dev/null
@@ -1,40 +0,0 @@
-// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
-// This source code is licensed under the Apache License Version 2.0, which
-// can be found in the LICENSE file in the root directory of this source tree.
-
-#include "table_hotspot_policy.h"
-
-#include <dsn/dist/fmt_logging.h>
-
-namespace pegasus {
-namespace server {
-
-void hotspot_calculator::aggregate(const std::vector<row_data> &partitions)
-{
- while (_app_data.size() > kMaxQueueSize - 1) {
- _app_data.pop();
- }
- std::vector<hotspot_partition_data> temp(partitions.size());
- for (int i = 0; i < partitions.size(); i++) {
- temp[i] = std::move(hotspot_partition_data(partitions[i]));
- }
- _app_data.emplace(temp);
-}
-
-void hotspot_calculator::init_perf_counter(const int perf_counter_count)
-{
- std::string counter_name;
- std::string counter_desc;
- for (int i = 0; i < perf_counter_count; i++) {
- string paritition_desc = _app_name + '.' + std::to_string(i);
- counter_name = fmt::format("app.stat.hotspots@{}", paritition_desc);
- counter_desc = fmt::format("statistic the hotspots of app {}",
paritition_desc);
- _points[i].init_app_counter(
- "app.pegasus", counter_name.c_str(), COUNTER_TYPE_NUMBER,
counter_desc.c_str());
- }
-}
-
-void hotspot_calculator::start_alg() { _policy->analysis(_app_data, _points); }
-
-} // namespace server
-} // namespace pegasus
diff --git a/src/server/table_hotspot_policy.h
b/src/server/table_hotspot_policy.h
deleted file mode 100644
index 62b7eb4..0000000
--- a/src/server/table_hotspot_policy.h
+++ /dev/null
@@ -1,100 +0,0 @@
-// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
-// This source code is licensed under the Apache License Version 2.0, which
-// can be found in the LICENSE file in the root directory of this source tree.
-
-#pragma once
-
-#include "hotspot_partition_data.h"
-
-#include <algorithm>
-#include <gtest/gtest_prod.h>
-#include <math.h>
-
-#include <dsn/perf_counter/perf_counter.h>
-
-namespace pegasus {
-namespace server {
-class hotspot_policy
-{
-public:
- // hotspot_app_data store the historical data which related to hotspot
- // it uses rolling queue to save one app's data
- // vector is used to save the partitions' data of this app
- // hotspot_partition_data is used to save data of one partition
- virtual void analysis(const
std::queue<std::vector<hotspot_partition_data>> &hotspot_app_data,
- std::vector<::dsn::perf_counter_wrapper>
&perf_counters) = 0;
-};
-
-// PauTa Criterion
-class hotspot_algo_qps_variance : public hotspot_policy
-{
-public:
- void analysis(const std::queue<std::vector<hotspot_partition_data>>
&hotspot_app_data,
- std::vector<::dsn::perf_counter_wrapper> &perf_counters)
- {
- dassert(hotspot_app_data.back().size() == perf_counters.size(),
- "partition counts error, please check");
- std::vector<double> data_samples;
- data_samples.reserve(hotspot_app_data.size() * perf_counters.size());
- auto temp_data = hotspot_app_data;
- double total = 0, sd = 0, avg = 0;
- int sample_count = 0;
- // avg: Average number
- // sd: Standard deviation
- // sample_count: Number of samples
- while (!temp_data.empty()) {
- for (auto partition_data : temp_data.front()) {
- if (partition_data.total_qps - 1.00 > 0) {
- data_samples.push_back(partition_data.total_qps);
- total += partition_data.total_qps;
- sample_count++;
- }
- }
- temp_data.pop();
- }
- if (sample_count == 0) {
- ddebug("hotspot_app_data size == 0");
- return;
- }
- avg = total / sample_count;
- for (auto data_sample : data_samples) {
- sd += pow((data_sample - avg), 2);
- }
- sd = sqrt(sd / sample_count);
- const auto &anly_data = hotspot_app_data.back();
- for (int i = 0; i < perf_counters.size(); i++) {
- double hot_point = (anly_data[i].total_qps - avg) / sd;
- // perf_counter->set can only be unsigned __int64
- // use ceil to guarantee conversion results
- hot_point = ceil(std::max(hot_point, double(0)));
- perf_counters[i]->set(hot_point);
- }
- }
-};
-
-// hotspot_calculator is used to find the hotspot in Pegasus
-class hotspot_calculator
-{
-public:
- hotspot_calculator(const std::string &app_name,
- const int partition_num,
- std::unique_ptr<hotspot_policy> policy)
- : _app_name(app_name), _points(partition_num),
_policy(std::move(policy))
- {
- init_perf_counter(partition_num);
- }
- void aggregate(const std::vector<row_data> &partitions);
- void start_alg();
- void init_perf_counter(const int perf_counter_count);
-
-private:
- const std::string _app_name;
- std::vector<::dsn::perf_counter_wrapper> _points;
- std::queue<std::vector<hotspot_partition_data>> _app_data;
- std::unique_ptr<hotspot_policy> _policy;
- static const int kMaxQueueSize = 100;
-
- FRIEND_TEST(table_hotspot_policy, hotspot_algo_qps_variance);
-};
-} // namespace server
-} // namespace pegasus
diff --git a/src/server/test/CMakeLists.txt b/src/server/test/CMakeLists.txt
index 7e70add..0184dbf 100644
--- a/src/server/test/CMakeLists.txt
+++ b/src/server/test/CMakeLists.txt
@@ -8,7 +8,7 @@ set(MY_PROJ_SRC "../pegasus_server_impl.cpp"
"../pegasus_server_write.cpp"
"../capacity_unit_calculator.cpp"
"../pegasus_mutation_duplicator.cpp"
- "../table_hotspot_policy.cpp"
+ "../hotspot_partition_calculator.cpp"
"../meta_store.cpp"
)
diff --git a/src/server/test/hotspot_partition_test.cpp
b/src/server/test/hotspot_partition_test.cpp
new file mode 100644
index 0000000..732a129
--- /dev/null
+++ b/src/server/test/hotspot_partition_test.cpp
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "server/hotspot_partition_calculator.h"
+
+#include <gtest/gtest.h>
+
+namespace pegasus {
+namespace server {
+
+TEST(hotspot_partition_calculator, hotspot_partition_policy)
+{
+ // TODO: refactor the unit test
+ std::vector<row_data> test_rows(8);
+ test_rows[0].get_qps = 1000.0;
+ test_rows[1].get_qps = 1000.0;
+ test_rows[2].get_qps = 1000.0;
+ test_rows[3].get_qps = 1000.0;
+ test_rows[4].get_qps = 1000.0;
+ test_rows[5].get_qps = 1000.0;
+ test_rows[6].get_qps = 1000.0;
+ test_rows[7].get_qps = 5000.0;
+ hotspot_partition_calculator test_hotspot_calculator("TEST", 8);
+ test_hotspot_calculator.data_aggregate(test_rows);
+ test_hotspot_calculator.data_analyse();
+ std::vector<double> result(8);
+ for (int i = 0; i < test_hotspot_calculator._hot_points.size(); i++) {
+ result[i] = test_hotspot_calculator._hot_points[i]->get_value();
+ }
+ std::vector<double> expect_vector{0, 0, 0, 0, 0, 0, 0, 3};
+ ASSERT_EQ(expect_vector, result);
+}
+
+} // namespace server
+} // namespace pegasus
diff --git a/src/server/test/pegasus_tablehotspot_test.cpp
b/src/server/test/pegasus_tablehotspot_test.cpp
deleted file mode 100644
index 63116b2..0000000
--- a/src/server/test/pegasus_tablehotspot_test.cpp
+++ /dev/null
@@ -1,36 +0,0 @@
-// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
-// This source code is licensed under the Apache License Version 2.0, which
-// can be found in the LICENSE file in the root directory of this source tree.
-
-#include "server/table_hotspot_policy.h"
-
-#include <gtest/gtest.h>
-
-namespace pegasus {
-namespace server {
-
-TEST(table_hotspot_policy, hotspot_algo_qps_variance)
-{
- std::vector<row_data> test_rows(8);
- test_rows[0].get_qps = 1000.0;
- test_rows[1].get_qps = 1000.0;
- test_rows[2].get_qps = 1000.0;
- test_rows[3].get_qps = 1000.0;
- test_rows[4].get_qps = 1000.0;
- test_rows[5].get_qps = 1000.0;
- test_rows[6].get_qps = 1000.0;
- test_rows[7].get_qps = 5000.0;
- std::unique_ptr<hotspot_policy> policy(new hotspot_algo_qps_variance());
- hotspot_calculator test_hotspot_calculator("TEST", 8, std::move(policy));
- test_hotspot_calculator.aggregate(test_rows);
- test_hotspot_calculator.start_alg();
- std::vector<double> result(8);
- for (int i = 0; i < test_hotspot_calculator._points.size(); i++) {
- result[i] = test_hotspot_calculator._points[i]->get_value();
- }
- std::vector<double> expect_vector{0, 0, 0, 0, 0, 0, 0, 3};
- ASSERT_EQ(expect_vector, result);
-}
-
-} // namespace server
-} // namespace pegasus
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]