This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 66a79dddf KUDU-3407 Avoid unchecked scheduling of flush operations.
66a79dddf is described below
commit 66a79dddf2faf31dbb49cd8e08c0e322c4bb57ce
Author: 宋家成 <[email protected]>
AuthorDate: Fri Jul 7 14:36:29 2023 +0800
KUDU-3407 Avoid unchecked scheduling of flush operations.
In some clusters, the memory usages of tservers might be 60% ~ 80%
for a long time. During this time the maintenance manager will not
run any operation other than wal gc and MRS/DRS flushes, which will
make the performance of tservers worse and worse and eventually break
due to OOM.
This patch add an argument to give a chance to do other operations
while server is under memory pressure.
This mechanism works when the memory usage is between
memory_pressure_percentage and memory_limit_soft_percentage.
Higher the memory usage is, higher the probability to flush
MRS/DMS.
e.g.
memory_pressure_percentage = 60%
memory_limit_soft_percentage = 80%
The probability of not flushing MRS/DMS is the value of
run_non_memory_ops_prob. As the memory increases, it gradually
decreases to 0, when thememory usage is 80%.
Change-Id: Idc2fd3a850cf99d54ef2980211b712468440ed80
Reviewed-on: http://gerrit.cloudera.org:8080/20166
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <[email protected]>
---
src/kudu/util/maintenance_manager-test.cc | 158 ++++++++++++++++++++++++++++--
src/kudu/util/maintenance_manager.cc | 43 +++++++-
src/kudu/util/maintenance_manager.h | 9 ++
3 files changed, 200 insertions(+), 10 deletions(-)
diff --git a/src/kudu/util/maintenance_manager-test.cc
b/src/kudu/util/maintenance_manager-test.cc
index de78f9608..177fca68f 100644
--- a/src/kudu/util/maintenance_manager-test.cc
+++ b/src/kudu/util/maintenance_manager-test.cc
@@ -29,6 +29,7 @@
#include <ostream>
#include <string>
#include <thread>
+#include <type_traits>
#include <utility>
#include <vector>
@@ -44,9 +45,12 @@
#include "kudu/util/maintenance_manager_metrics.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
+#include "kudu/util/thread.h"
using std::list;
using std::shared_ptr;
@@ -72,6 +76,10 @@ DECLARE_bool(enable_maintenance_manager);
DECLARE_int64(log_target_replay_size_mb);
DECLARE_double(maintenance_op_multiplier);
DECLARE_int32(max_priority_range);
+DECLARE_double(run_non_memory_ops_prob);
+DECLARE_double(data_gc_prioritization_prob);
+DECLARE_int32(memory_pressure_percentage);
+DECLARE_int32(memory_limit_soft_percentage);
namespace kudu {
// Set this a bit bigger so that the manager could keep track of all possible
completed ops.
@@ -89,6 +97,7 @@ class TestMaintenanceOp : public MaintenanceOp {
start_stats_latch_(start_stats_latch),
continue_stats_latch_(continue_stats_latch),
ram_anchored_(500),
+ data_retained_bytes_(0),
logs_retained_bytes_(0),
perf_improvement_(0),
metric_entity_(METRIC_ENTITY_test.Instantiate(&metric_registry_,
"test")),
@@ -100,12 +109,18 @@ class TestMaintenanceOp : public MaintenanceOp {
update_stats_time_(MonoDelta::FromSeconds(0)),
priority_(priority),
workload_score_(0),
- update_stats_count_(0) {
+ update_stats_count_(0),
+ update_time_(MonoTime::Now()),
+ queue_time_(MonoDelta::FromSeconds(0)),
+ run_count_(0),
+ updated_(false),
+ register_self_(false) {
}
~TestMaintenanceOp() override = default;
bool Prepare() override {
+ queue_time_ += (MonoTime::Now() - update_time_);
std::lock_guard<simple_spinlock> guard(lock_);
if (remaining_runs_ == 0) {
return false;
@@ -126,13 +141,20 @@ class TestMaintenanceOp : public MaintenanceOp {
CHECK_GE(prepared_runs_, 1);
prepared_runs_--;
}
-
SleepFor(sleep_time_);
-
{
std::lock_guard<simple_spinlock> guard(lock_);
+ run_count_++;
+ updated_ = false;
completed_at_ = MonoTime::Now();
}
+ if (register_self_) {
+ scoped_refptr<kudu::Thread> thread;
+ // Re-register itself after 50ms.
+ kudu::Thread::Create("maintenance-test", "self-register", [this]() {
+ this->set_remaining_runs(1);
+ }, &thread);
+ }
}
void UpdateStats(MaintenanceOpStats* stats) override {
@@ -153,9 +175,13 @@ class TestMaintenanceOp : public MaintenanceOp {
stats->set_runnable(remaining_runs_ > 0);
stats->set_ram_anchored(ram_anchored_);
stats->set_logs_retained_bytes(logs_retained_bytes_);
+ stats->set_data_retained_bytes(data_retained_bytes_);
stats->set_perf_improvement(perf_improvement_);
stats->set_workload_score(workload_score_);
-
+ if (remaining_runs_ > 0 && !updated_) {
+ update_time_ = MonoTime::Now();
+ updated_ = true;
+ }
++update_stats_count_;
}
@@ -184,6 +210,11 @@ class TestMaintenanceOp : public MaintenanceOp {
logs_retained_bytes_ = logs_retained_bytes;
}
+ void set_data_retained_bytes(uint64_t data_retained_bytes) {
+ std::lock_guard<simple_spinlock> guard(lock_);
+ data_retained_bytes_ = data_retained_bytes;
+ }
+
void set_perf_improvement(uint64_t perf_improvement) {
std::lock_guard<simple_spinlock> guard(lock_);
perf_improvement_ = perf_improvement;
@@ -194,6 +225,11 @@ class TestMaintenanceOp : public MaintenanceOp {
workload_score_ = workload_score;
}
+ void set_register_self(bool register_self) {
+ std::lock_guard<simple_spinlock> guard(lock_);
+ register_self_ = register_self;
+ }
+
scoped_refptr<Histogram> DurationHistogram() const override {
return maintenance_op_duration_;
}
@@ -221,6 +257,14 @@ class TestMaintenanceOp : public MaintenanceOp {
return completed_at_;
}
+ int64_t run_count() const {
+ return run_count_;
+ }
+
+ MonoDelta queue_time() const {
+ return queue_time_;
+ }
+
private:
mutable simple_spinlock lock_;
@@ -233,6 +277,7 @@ class TestMaintenanceOp : public MaintenanceOp {
CountDownLatch* continue_stats_latch_;
uint64_t ram_anchored_;
+ uint64_t data_retained_bytes_;
uint64_t logs_retained_bytes_;
uint64_t perf_improvement_;
MetricRegistry metric_registry_;
@@ -262,6 +307,18 @@ class TestMaintenanceOp : public MaintenanceOp {
// Timestamp of the monotonous clock when the operation was completed.
MonoTime completed_at_;
+
+ // Timestamp of updating status in FindBestOp.
+ MonoTime update_time_;
+ // Sum of scheduled time, which indicates how long this op has been
+ // scheduled after it could be run.
+ MonoDelta queue_time_;
+ // How many times the operation has been run.
+ int64_t run_count_;
+ // If the operation had been updated but has not been performed.
+ bool updated_;
+ // Whether re-register itself after performing.
+ bool register_self_;
};
class MaintenanceManagerTest : public KuduTest {
@@ -285,9 +342,20 @@ class MaintenanceManagerTest : public KuduTest {
options.polling_interval_ms = 1;
options.history_size = kHistorySize;
manager_.reset(new MaintenanceManager(options, kFakeUuid, metric_entity_));
+ // Same logic with MaintenanceManager#ProceedWithFlush, but the memory
usage is
+ // simulated.
manager_->set_memory_pressure_func_for_tests(
[&](double* /* consumption */) {
- return indicate_memory_pressure_.load();
+ double pressure_ratio =
static_cast<double>(FLAGS_memory_pressure_percentage) / 100;
+ if (memory_pressure_pct_.load() >= pressure_ratio) {
+ double pressure_threshold = pressure_ratio;
+ double soft_limit =
static_cast<double>(FLAGS_memory_limit_soft_percentage) / 100;
+ return pressure_threshold >= soft_limit ||
memory_pressure_pct_.load() >=
+ soft_limit ||Random(GetRandomSeed32()).NextDoubleFraction() >=
+ FLAGS_run_non_memory_ops_prob * (soft_limit -
memory_pressure_pct_.load())
+ / (soft_limit - pressure_threshold);
+ }
+ return false;
});
ASSERT_OK(manager_->Start());
}
@@ -317,7 +385,7 @@ class MaintenanceManagerTest : public KuduTest {
scoped_refptr<MetricEntity> metric_entity_;
shared_ptr<MaintenanceManager> manager_;
- std::atomic<bool> indicate_memory_pressure_ { false };
+ std::atomic<double> memory_pressure_pct_ { 0.0 };
};
// Just create the MaintenanceManager and then shut it down, to make sure
@@ -412,7 +480,7 @@ TEST_F(MaintenanceManagerTest,
TestMemoryPressurePrioritizesMemory) {
ASSERT_EQ(0, op.DurationHistogram()->TotalCount());
// Fake that the server is under memory pressure.
- indicate_memory_pressure_ = true;
+ memory_pressure_pct_ = 0.7;
ASSERT_EVENTUALLY([&]() {
ASSERT_EQ(op.DurationHistogram()->TotalCount(), 1);
@@ -433,7 +501,7 @@ TEST_F(MaintenanceManagerTest,
TestMemoryPressurePerformsNoMemoryOp) {
// Now fake that the server is under memory pressure and make our op runnable
// by giving it a perf score.
- indicate_memory_pressure_ = true;
+ memory_pressure_pct_ = 0.7;
op.set_perf_improvement(1);
// Even though we're under memory pressure, and even though our op doesn't
@@ -513,7 +581,7 @@ TEST_F(MaintenanceManagerTest,
TestPrioritizeLogRetentionUnderMemoryPressure) {
op3.set_logs_retained_bytes(99);
op3.set_ram_anchored(101);
- indicate_memory_pressure_ = true;
+ memory_pressure_pct_ = 0.7;
manager_->RegisterOp(&op1);
manager_->RegisterOp(&op2);
manager_->RegisterOp(&op3);
@@ -899,4 +967,76 @@ TEST_F(MaintenanceManagerTest,
TestUnregisterWhileScheduling) {
op1.Unregister();
}
+// Show which operation the @MaintenanceManager will pick in various workloads
and
+// policies. This test would not assert anything since it tests the
probability flags.
+TEST_F(MaintenanceManagerTest, ComprehensiveTest) {
+ SKIP_IF_SLOW_NOT_ALLOWED();
+
+ // Select policies here.
+ memory_pressure_pct_ = 0.6;
+ FLAGS_run_non_memory_ops_prob = 0.2;
+ FLAGS_data_gc_prioritization_prob = 0.5;
+
+ StopManager();
+
+ TestMaintenanceOp op1("perf_op", MaintenanceOp::HIGH_IO_USAGE);
+ op1.set_perf_improvement(10);
+ op1.set_remaining_runs(1);
+ op1.set_sleep_time(MonoDelta::FromMilliseconds(5));
+ op1.set_register_self(true);
+
+ TestMaintenanceOp op2("memory_op", MaintenanceOp::HIGH_IO_USAGE);
+ op2.set_ram_anchored(2000);
+ op2.set_remaining_runs(1);
+ op2.set_sleep_time(MonoDelta::FromMilliseconds(5));
+ op2.set_register_self(true);
+
+ TestMaintenanceOp op3("data_gc_op", MaintenanceOp::HIGH_IO_USAGE);
+ op3.set_data_retained_bytes(1024 * 1024);
+ op3.set_remaining_runs(1);
+ op3.set_sleep_time(MonoDelta::FromMilliseconds(5));
+ op3.set_register_self(true);
+
+ // Set maintenance manager num to 1 to get a clean vision of scheduling.
+ NO_FATALS(StartManager(1));
+ FLAGS_enable_maintenance_manager = false;
+ manager_->RegisterOp(&op1);
+ manager_->RegisterOp(&op2);
+ manager_->RegisterOp(&op3);
+ FLAGS_enable_maintenance_manager = true;
+ // Wait for the memory_op to run over 1000 times and then check the running
times
+ // of other operations.
+ AssertEventually([&]() {
+ ASSERT_LE(1000, op2.DurationHistogram()->TotalCount());
+ }, MonoDelta::FromSeconds(60));
+ op1.set_register_self(false);
+ op2.set_register_self(false);
+ op3.set_register_self(false);
+ // Wait until all the operations are done.
+ SleepFor(MonoDelta::FromMilliseconds(100));
+
+ manager_->UnregisterOp(&op1);
+ manager_->UnregisterOp(&op2);
+ manager_->UnregisterOp(&op3);
+
+ // The expected running times of operations should be like:
+ // other_ops_running_times = memory_op_running_times *
probability_not_to_flush
+ // Since the memory usage is 60%, probability_not_to_flush is exactly
+ // FLAGS_run_non_memory_ops_prob. And also, the applying and registering
might
+ // take time, so the other_ops_running_times might be greater than expected.
+ const int64_t memory_op_running_times = op2.run_count();
+ const int64_t other_ops_running_times = op1.run_count() + op3.run_count();
+ ASSERT_LT(memory_op_running_times * 0.15, other_ops_running_times);
+ ASSERT_GT(memory_op_running_times * 0.35, other_ops_running_times);
+
+ LOG(INFO) << Substitute("op1: $0 perform count: $1 average schedule time:
$2",
+ op1.name(), op1.run_count(),
op1.queue_time().ToMilliseconds()
+ / op1.run_count());
+ LOG(INFO) << Substitute("op2: $0 perform count: $1 average schedule time:
$2",
+ op2.name(), op2.run_count(),
op2.queue_time().ToMilliseconds()
+ / op2.run_count());
+ LOG(INFO) << Substitute("op3: $0 perform count: $1 average schedule time:
$2",
+ op3.name(), op3.run_count(),
op3.queue_time().ToMilliseconds()
+ / op3.run_count());
+}
} // namespace kudu
diff --git a/src/kudu/util/maintenance_manager.cc
b/src/kudu/util/maintenance_manager.cc
index 56064e3e1..898fb0e3f 100644
--- a/src/kudu/util/maintenance_manager.cc
+++ b/src/kudu/util/maintenance_manager.cc
@@ -101,6 +101,18 @@ DEFINE_double(data_gc_prioritization_prob, 0.5,
"such as delta compaction.");
TAG_FLAG(data_gc_prioritization_prob, experimental);
+DEFINE_double(run_non_memory_ops_prob, 0,
+ "The probability that the tablet server will not flush DRS or
MRS "
+ "while under memory pressure. This is useful when the server is
under "
+ "memory pressure for a long time and there are non-memory
operations "
+ "waiting to be run. The higher value means higher probability to
"
+ "do other ops instead of flushing ops. This might be needed to
turn "
+ "on if system admin found that the tablet server is under memory
"
+ "pressure for a long time and there is a significant degradation
in "
+ "performance.");
+TAG_FLAG(run_non_memory_ops_prob, experimental);
+TAG_FLAG(run_non_memory_ops_prob, runtime);
+
DEFINE_double(maintenance_op_multiplier, 1.1,
"Multiplier applied on different priority levels, table
maintenance OPs on level N "
"has multiplier of FLAGS_maintenance_op_multiplier^N, the last
score will be "
@@ -121,6 +133,20 @@ DEFINE_int32(maintenance_manager_inject_latency_ms, 0,
TAG_FLAG(maintenance_manager_inject_latency_ms, runtime);
TAG_FLAG(maintenance_manager_inject_latency_ms, unsafe);
+DECLARE_int32(memory_pressure_percentage);
+DECLARE_int32(memory_limit_soft_percentage);
+
+static bool ValidateProbability(const char* flagname, double value) {
+ if (value >= 0.0 && value <= 1.0) {
+ return true;
+ }
+ LOG(ERROR) << Substitute("$0 must be a probability from 0 to 1,"
+ " value $1 is invalid", flagname, value);
+ return false;
+}
+DEFINE_validator(run_non_memory_ops_prob, &ValidateProbability);
+DEFINE_validator(data_gc_prioritization_prob, &ValidateProbability);
+
namespace kudu {
MaintenanceOpStats::MaintenanceOpStats() {
@@ -193,7 +219,9 @@ MaintenanceManager::MaintenanceManager(
: FLAGS_maintenance_manager_history_size),
completed_ops_count_(0),
rand_(GetRandomSeed32()),
- memory_pressure_func_(&process_memory::UnderMemoryPressure),
+ memory_pressure_func_([&](double* consumption) {
+ return this->ProceedWithFlush(consumption);
+ }),
metrics_(CHECK_NOTNULL(metric_entity)) {
CHECK_OK(ThreadPoolBuilder("MaintenanceMgr")
.set_min_threads(num_threads_)
@@ -504,6 +532,9 @@ pair<MaintenanceOp*, string>
MaintenanceManager::FindBestOp() {
// are anchoring WALs. Choosing the op that frees the most WALs ensures that
// all ops that anchor memory (and also anchor WALs) will eventually be
// performed.
+ //
+ // Do not always flush MRS/DMS even under memory pressure, some perf
improvement
+ // ops might be more important than freeing memory even if under memory
pressure.
double capacity_pct;
if (memory_pressure_func_(&capacity_pct) &&
most_logs_retained_bytes_ram_anchored_op) {
DCHECK_GT(most_logs_retained_bytes_ram_anchored, 0);
@@ -693,4 +724,14 @@ void
MaintenanceManager::DecreaseOpCountAndNotifyWaiters(MaintenanceOp* op) {
op->cond_->Signal();
}
+bool MaintenanceManager::ProceedWithFlush(double* used_memory_percentage) {
+ if (process_memory::UnderMemoryPressure(used_memory_percentage)) {
+ double pressure_threshold =
static_cast<double>(FLAGS_memory_pressure_percentage) / 100;
+ double soft_limit =
static_cast<double>(FLAGS_memory_limit_soft_percentage) / 100;
+ return pressure_threshold >= soft_limit || *used_memory_percentage >=
soft_limit ||
+ rand_.NextDoubleFraction() >= FLAGS_run_non_memory_ops_prob *
+ (soft_limit - *used_memory_percentage) / (soft_limit -
pressure_threshold);
+ }
+ return false;
+}
} // namespace kudu
diff --git a/src/kudu/util/maintenance_manager.h
b/src/kudu/util/maintenance_manager.h
index 2e1f48b9d..6b95d1a18 100644
--- a/src/kudu/util/maintenance_manager.h
+++ b/src/kudu/util/maintenance_manager.h
@@ -376,6 +376,15 @@ class MaintenanceManager : public
std::enable_shared_from_this<MaintenanceManage
// 'lock_' is held.
void MergePendingOpRegistrationsUnlocked();
+ /// Determine whether to run flush ops, depends on memory pressure and
+ /// the flag run_non_memory_ops_prob.
+ ///
+ /// @param [out] used_memory_percentage
+ /// The memory usage for now.
+ ///
+ /// @return Should the maintenance manager find a flush operation to run.
+ bool ProceedWithFlush(double* used_memory_percentage);
+
const std::string server_uuid_;
const int32_t num_threads_;
const MonoDelta polling_interval_;