This is an automated email from the ASF dual-hosted git repository.
adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new c584d0d [maintenance] Support priorities for tables in MM compaction
c584d0d is described below
commit c584d0def49ac545121247794d80c8e0fefd8f6c
Author: Yingchun Lai <[email protected]>
AuthorDate: Mon Mar 25 23:35:01 2019 -0400
[maintenance] Support priorities for tables in MM compaction
This commit adds a feature to specify different priorities for table
compaction.
In a Kudu cluster with thousands of tables, it's hard for a specified
tablet's
maintenance OPs to be launched when their scores are not the highest, even
if
the table the tablet belongs to is high priority for Kudu users. This patch
allows administators to specify different priorities for tables by gflags,
these
maintenance OPs of these high priority tables have greater chance to be
launched.
Change-Id: I3ea3b73505157678a8fb551656123b64e6bfb304
Reviewed-on: http://gerrit.cloudera.org:8080/12852
Tested-by: Adar Dembo <[email protected]>
Reviewed-by: Adar Dembo <[email protected]>
---
src/kudu/tablet/tablet.h | 1 +
src/kudu/tablet/tablet_metadata.h | 2 +-
src/kudu/tablet/tablet_mm_ops-test.cc | 3 +-
src/kudu/tablet/tablet_mm_ops.cc | 15 ++-
src/kudu/tablet/tablet_mm_ops.h | 33 ++---
src/kudu/tablet/tablet_replica.cc | 2 +-
src/kudu/tablet/tablet_replica.h | 7 +-
src/kudu/tablet/tablet_replica_mm_ops.cc | 23 +++-
src/kudu/tablet/tablet_replica_mm_ops.h | 67 +++++-----
src/kudu/util/maintenance_manager-test.cc | 177 +++++++++++++++++++++++----
src/kudu/util/maintenance_manager.cc | 197 +++++++++++++++++++++---------
src/kudu/util/maintenance_manager.h | 23 +++-
src/kudu/util/maintenance_manager.proto | 11 +-
13 files changed, 408 insertions(+), 153 deletions(-)
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index 05c7c9b..c4a5691 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -417,6 +417,7 @@ class Tablet {
// This method is thread-safe.
void CancelMaintenanceOps();
+ const std::string& table_id() const { return metadata_->table_id(); }
const std::string& tablet_id() const { return metadata_->tablet_id(); }
// Return the metrics for this tablet.
diff --git a/src/kudu/tablet/tablet_metadata.h
b/src/kudu/tablet/tablet_metadata.h
index 62e7545..adf0d4c 100644
--- a/src/kudu/tablet/tablet_metadata.h
+++ b/src/kudu/tablet/tablet_metadata.h
@@ -119,7 +119,7 @@ class TabletMetadata : public
RefCountedThreadSafe<TabletMetadata> {
return partition_;
}
- std::string table_id() const {
+ const std::string& table_id() const {
DCHECK_NE(state_, kNotLoadedYet);
return table_id_;
}
diff --git a/src/kudu/tablet/tablet_mm_ops-test.cc
b/src/kudu/tablet/tablet_mm_ops-test.cc
index a2f5572..3a13386 100644
--- a/src/kudu/tablet/tablet_mm_ops-test.cc
+++ b/src/kudu/tablet/tablet_mm_ops-test.cc
@@ -62,7 +62,7 @@ class KuduTabletMmOpsTest : public
TabletTestBase<IntKeyTestSetup<INT64>> {
void StatsShouldChange(MaintenanceOp* op) {
SleepFor(MonoDelta::FromMilliseconds(1));
op->UpdateStats(&stats_);
- ASSERT_TRUE(next_time_ < stats_.last_modified());
+ ASSERT_LT(next_time_, stats_.last_modified());
next_time_ = stats_.last_modified();
}
@@ -70,7 +70,6 @@ class KuduTabletMmOpsTest : public
TabletTestBase<IntKeyTestSetup<INT64>> {
SleepFor(MonoDelta::FromMilliseconds(1));
op->UpdateStats(&stats_);
ASSERT_EQ(next_time_, stats_.last_modified());
- next_time_ = stats_.last_modified();
}
void TestFirstCall(MaintenanceOp* op) {
diff --git a/src/kudu/tablet/tablet_mm_ops.cc b/src/kudu/tablet/tablet_mm_ops.cc
index c71bc6e..5948061 100644
--- a/src/kudu/tablet/tablet_mm_ops.cc
+++ b/src/kudu/tablet/tablet_mm_ops.cc
@@ -24,6 +24,7 @@
#include <gflags/gflags.h>
#include <glog/logging.h>
+#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/rowset.h"
#include "kudu/tablet/tablet.h"
@@ -87,6 +88,10 @@ string TabletOpBase::LogPrefix() const {
return tablet_->LogPrefix();
}
+const std::string& TabletOpBase::table_id() const {
+ return tablet_->table_id();
+}
+
////////////////////////////////////////////////////////////
// CompactRowSetsOp
////////////////////////////////////////////////////////////
@@ -262,12 +267,12 @@ void
MajorDeltaCompactionOp::UpdateStats(MaintenanceOpStats* stats) {
// cached stats.
TabletMetrics* metrics = tablet_->metrics();
if (metrics) {
- int64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
- int64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount();
- int64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
- int64_t new_num_rs_minor_delta_compacted =
+ uint64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
+ uint64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount();
+ uint64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
+ uint64_t new_num_rs_minor_delta_compacted =
metrics->delta_minor_compact_rs_duration->TotalCount();
- int64_t new_num_rs_major_delta_compacted =
+ uint64_t new_num_rs_major_delta_compacted =
metrics->delta_major_compact_rs_duration->TotalCount();
if (prev_stats_.valid() &&
new_num_mrs_flushed == last_num_mrs_flushed_ &&
diff --git a/src/kudu/tablet/tablet_mm_ops.h b/src/kudu/tablet/tablet_mm_ops.h
index 8fc865e..6180652 100644
--- a/src/kudu/tablet/tablet_mm_ops.h
+++ b/src/kudu/tablet/tablet_mm_ops.h
@@ -22,7 +22,6 @@
#include <string>
#include "kudu/gutil/macros.h"
-#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/util/locks.h"
#include "kudu/util/maintenance_manager.h"
@@ -43,6 +42,8 @@ class TabletOpBase : public MaintenanceOp {
std::string LogPrefix() const;
protected:
+ const std::string& table_id() const override;
+
Tablet* const tablet_;
};
@@ -57,15 +58,15 @@ class CompactRowSetsOp : public TabletOpBase {
public:
explicit CompactRowSetsOp(Tablet* tablet);
- virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
+ void UpdateStats(MaintenanceOpStats* stats) override;
- virtual bool Prepare() OVERRIDE;
+ bool Prepare() override;
- virtual void Perform() OVERRIDE;
+ void Perform() override;
- virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
+ scoped_refptr<Histogram> DurationHistogram() const override;
- virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
+ scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const override;
private:
mutable simple_spinlock lock_;
@@ -83,15 +84,15 @@ class MinorDeltaCompactionOp : public TabletOpBase {
public:
explicit MinorDeltaCompactionOp(Tablet* tablet);
- virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
+ void UpdateStats(MaintenanceOpStats* stats) override;
- virtual bool Prepare() OVERRIDE;
+ bool Prepare() override;
- virtual void Perform() OVERRIDE;
+ void Perform() override;
- virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
+ scoped_refptr<Histogram> DurationHistogram() const override;
- virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
+ scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const override;
private:
mutable simple_spinlock lock_;
@@ -109,15 +110,15 @@ class MajorDeltaCompactionOp : public TabletOpBase {
public:
explicit MajorDeltaCompactionOp(Tablet* tablet);
- virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
+ void UpdateStats(MaintenanceOpStats* stats) override;
- virtual bool Prepare() OVERRIDE;
+ bool Prepare() override;
- virtual void Perform() OVERRIDE;
+ void Perform() override;
- virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
+ scoped_refptr<Histogram> DurationHistogram() const override;
- virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
+ scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const override;
private:
mutable simple_spinlock lock_;
diff --git a/src/kudu/tablet/tablet_replica.cc
b/src/kudu/tablet/tablet_replica.cc
index c7b231b..a284215 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -38,6 +38,7 @@
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/raft_consensus.h"
+#include "kudu/fs/data_dirs.h"
#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/bind.h"
#include "kudu/gutil/bind_helpers.h"
@@ -124,7 +125,6 @@ TabletReplica::TabletReplica(
Callback<void(const std::string& reason)> mark_dirty_clbk)
: meta_(DCHECK_NOTNULL(std::move(meta))),
cmeta_manager_(DCHECK_NOTNULL(std::move(cmeta_manager))),
- tablet_id_(meta_->tablet_id()),
local_peer_pb_(std::move(local_peer_pb)),
log_anchor_registry_(new LogAnchorRegistry()),
apply_pool_(apply_pool),
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index bf9d63d..50fab2d 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -254,10 +254,8 @@ class TabletReplica : public
RefCountedThreadSafe<TabletReplica>,
return log_anchor_registry_;
}
- // Returns the tablet_id of the tablet managed by this TabletReplica.
- // Returns the correct tablet_id even if the underlying tablet is not
available
- // yet.
- const std::string& tablet_id() const { return tablet_id_; }
+ const std::string& table_id() const { return meta_->table_id(); }
+ const std::string& tablet_id() const { return meta_->tablet_id(); }
// Convenience method to return the permanent_uuid of this peer.
std::string permanent_uuid() const { return
tablet_->metadata()->fs_manager()->uuid(); }
@@ -322,7 +320,6 @@ class TabletReplica : public
RefCountedThreadSafe<TabletReplica>,
const scoped_refptr<TabletMetadata> meta_;
const scoped_refptr<consensus::ConsensusMetadataManager> cmeta_manager_;
- const std::string tablet_id_;
const consensus::RaftPeerPB local_peer_pb_;
scoped_refptr<log::LogAnchorRegistry> log_anchor_registry_; // Assigned in
tablet_replica-test
diff --git a/src/kudu/tablet/tablet_replica_mm_ops.cc
b/src/kudu/tablet/tablet_replica_mm_ops.cc
index 879425a..08b7ac6 100644
--- a/src/kudu/tablet/tablet_replica_mm_ops.cc
+++ b/src/kudu/tablet/tablet_replica_mm_ops.cc
@@ -21,11 +21,13 @@
#include <mutex>
#include <ostream>
#include <string>
+#include <utility>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/tablet_metrics.h"
#include "kudu/util/flag_tags.h"
@@ -122,6 +124,20 @@ void
FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(MaintenanceOpStats
}
//
+// TabletReplicaOpBase.
+//
+TabletReplicaOpBase::TabletReplicaOpBase(std::string name,
+ IOUsage io_usage,
+ TabletReplica* tablet_replica)
+ : MaintenanceOp(std::move(name), io_usage),
+ tablet_replica_(tablet_replica) {
+}
+
+const std::string& TabletReplicaOpBase::table_id() const {
+ return tablet_replica_->table_id();
+}
+
+//
// FlushMRSOp.
//
@@ -260,9 +276,10 @@ scoped_refptr<AtomicGauge<uint32_t> >
FlushDeltaMemStoresOp::RunningGauge() cons
//
LogGCOp::LogGCOp(TabletReplica* tablet_replica)
- : MaintenanceOp(StringPrintf("LogGCOp(%s)",
tablet_replica->tablet()->tablet_id().c_str()),
- MaintenanceOp::LOW_IO_USAGE),
- tablet_replica_(tablet_replica),
+ : TabletReplicaOpBase(StringPrintf("LogGCOp(%s)",
+
tablet_replica->tablet()->tablet_id().c_str()),
+ MaintenanceOp::LOW_IO_USAGE,
+ tablet_replica),
log_gc_duration_(METRIC_log_gc_duration.Instantiate(
tablet_replica->tablet()->GetMetricEntity())),
log_gc_running_(METRIC_log_gc_running.Instantiate(
diff --git a/src/kudu/tablet/tablet_replica_mm_ops.h
b/src/kudu/tablet/tablet_replica_mm_ops.h
index 2404b48..62b4cb9 100644
--- a/src/kudu/tablet/tablet_replica_mm_ops.h
+++ b/src/kudu/tablet/tablet_replica_mm_ops.h
@@ -21,7 +21,6 @@
#include <cstdint>
#include <string>
-#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/tablet/tablet.h"
@@ -47,86 +46,92 @@ class FlushOpPerfImprovementPolicy {
FlushOpPerfImprovementPolicy() {}
};
+class TabletReplicaOpBase : public MaintenanceOp {
+ public:
+ explicit TabletReplicaOpBase(std::string name, IOUsage io_usage,
TabletReplica* tablet_replica);
+
+ protected:
+ const std::string& table_id() const override;
+
+ TabletReplica *const tablet_replica_;
+};
+
// Maintenance op for MRS flush. Only one can happen at a time.
-class FlushMRSOp : public MaintenanceOp {
+class FlushMRSOp : public TabletReplicaOpBase {
public:
explicit FlushMRSOp(TabletReplica* tablet_replica)
- : MaintenanceOp(StringPrintf("FlushMRSOp(%s)",
tablet_replica->tablet()->tablet_id().c_str()),
- MaintenanceOp::HIGH_IO_USAGE),
- tablet_replica_(tablet_replica) {
+ : TabletReplicaOpBase(StringPrintf("FlushMRSOp(%s)",
+
tablet_replica->tablet()->tablet_id().c_str()),
+ MaintenanceOp::HIGH_IO_USAGE,
+ tablet_replica) {
time_since_flush_.start();
}
- virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
+ void UpdateStats(MaintenanceOpStats* stats) override;
- virtual bool Prepare() OVERRIDE;
+ bool Prepare() override;
- virtual void Perform() OVERRIDE;
+ void Perform() override;
- virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
+ scoped_refptr<Histogram> DurationHistogram() const override;
- virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
+ scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const override;
private:
// Lock protecting time_since_flush_.
mutable simple_spinlock lock_;
Stopwatch time_since_flush_;
-
- TabletReplica *const tablet_replica_;
};
// Maintenance op for DMS flush.
// Reports stats for all the DMS this tablet contains but only flushes one in
Perform().
-class FlushDeltaMemStoresOp : public MaintenanceOp {
+class FlushDeltaMemStoresOp : public TabletReplicaOpBase {
public:
explicit FlushDeltaMemStoresOp(TabletReplica* tablet_replica)
- : MaintenanceOp(StringPrintf("FlushDeltaMemStoresOp(%s)",
-
tablet_replica->tablet()->tablet_id().c_str()),
- MaintenanceOp::HIGH_IO_USAGE),
- tablet_replica_(tablet_replica) {
+ : TabletReplicaOpBase(StringPrintf("FlushDeltaMemStoresOp(%s)",
+
tablet_replica->tablet()->tablet_id().c_str()),
+ MaintenanceOp::HIGH_IO_USAGE,
+ tablet_replica) {
time_since_flush_.start();
}
- virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
+ void UpdateStats(MaintenanceOpStats* stats) override;
- virtual bool Prepare() OVERRIDE {
+ bool Prepare() override {
return true;
}
- virtual void Perform() OVERRIDE;
+ void Perform() override;
- virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
+ scoped_refptr<Histogram> DurationHistogram() const override;
- virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
+ scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const override;
private:
// Lock protecting time_since_flush_
mutable simple_spinlock lock_;
Stopwatch time_since_flush_;
-
- TabletReplica *const tablet_replica_;
};
// Maintenance task that runs log GC. Reports log retention that represents
the amount of data
// that can be GC'd.
//
// Only one LogGC op can run at a time.
-class LogGCOp : public MaintenanceOp {
+class LogGCOp : public TabletReplicaOpBase {
public:
explicit LogGCOp(TabletReplica* tablet_replica);
- virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
+ void UpdateStats(MaintenanceOpStats* stats) override;
- virtual bool Prepare() OVERRIDE;
+ bool Prepare() override;
- virtual void Perform() OVERRIDE;
+ void Perform() override;
- virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
+ scoped_refptr<Histogram> DurationHistogram() const override;
- virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
+ scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const override;
private:
- TabletReplica *const tablet_replica_;
scoped_refptr<Histogram> log_gc_duration_;
scoped_refptr<AtomicGauge<uint32_t> > log_gc_running_;
mutable Semaphore sem_;
diff --git a/src/kudu/util/maintenance_manager-test.cc
b/src/kudu/util/maintenance_manager-test.cc
index 6777e06..8619453 100644
--- a/src/kudu/util/maintenance_manager-test.cc
+++ b/src/kudu/util/maintenance_manager-test.cc
@@ -15,24 +15,28 @@
// specific language governing permissions and limitations
// under the License.
+#include "kudu/util/maintenance_manager.h"
+
+#include <math.h>
+
+#include <algorithm>
#include <atomic>
#include <cstdint>
+#include <list>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <utility>
-#include <vector>
#include <boost/bind.hpp> // IWYU pragma: keep
+#include <gflags/gflags.h>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
-#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
-#include "kudu/util/maintenance_manager.h"
#include "kudu/util/maintenance_manager.pb.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
@@ -42,9 +46,9 @@
#include "kudu/util/test_util.h"
#include "kudu/util/thread.h"
+using std::list;
using std::shared_ptr;
using std::string;
-using std::vector;
using strings::Substitute;
METRIC_DEFINE_entity(test);
@@ -57,17 +61,29 @@ METRIC_DEFINE_histogram(test, maintenance_op_duration,
kudu::MetricUnit::kSeconds, "", 60000000LU, 2);
DECLARE_int64(log_target_replay_size_mb);
+DECLARE_string(maintenance_manager_table_priorities);
+DECLARE_double(maintenance_op_multiplier);
+DECLARE_int32(max_priority_range);
+
namespace kudu {
-static const int kHistorySize = 4;
+static const int kHistorySize = 6;
static const char kFakeUuid[] = "12345";
class MaintenanceManagerTest : public KuduTest {
public:
void SetUp() override {
+ StartManager(2);
+ }
+
+ void TearDown() override {
+ StopManager();
+ }
+
+ void StartManager(int32_t num_threads) {
MaintenanceManager::Options options;
- options.num_threads = 2;
+ options.num_threads = num_threads;
options.polling_interval_ms = 1;
options.history_size = kHistorySize;
manager_.reset(new MaintenanceManager(options, kFakeUuid));
@@ -78,7 +94,7 @@ class MaintenanceManagerTest : public KuduTest {
ASSERT_OK(manager_->Start());
}
- void TearDown() override {
+ void StopManager() {
manager_->Shutdown();
}
@@ -95,7 +111,8 @@ TEST_F(MaintenanceManagerTest, TestCreateAndShutdown) {
class TestMaintenanceOp : public MaintenanceOp {
public:
TestMaintenanceOp(const std::string& name,
- IOUsage io_usage)
+ IOUsage io_usage,
+ std::string table_id = "fake.table_id")
: MaintenanceOp(name, io_usage),
ram_anchored_(500),
logs_retained_bytes_(0),
@@ -105,12 +122,13 @@ class TestMaintenanceOp : public MaintenanceOp {
maintenance_ops_running_(METRIC_maintenance_ops_running.Instantiate(metric_entity_,
0)),
remaining_runs_(1),
prepared_runs_(0),
- sleep_time_(MonoDelta::FromSeconds(0)) {
+ sleep_time_(MonoDelta::FromSeconds(0)),
+ table_id_(std::move(table_id)) {
}
- virtual ~TestMaintenanceOp() {}
+ ~TestMaintenanceOp() override = default;
- virtual bool Prepare() OVERRIDE {
+ bool Prepare() override {
std::lock_guard<Mutex> guard(lock_);
if (remaining_runs_ == 0) {
return false;
@@ -121,7 +139,7 @@ class TestMaintenanceOp : public MaintenanceOp {
return true;
}
- virtual void Perform() OVERRIDE {
+ void Perform() override {
{
std::lock_guard<Mutex> guard(lock_);
DLOG(INFO) << "Performing op " << name();
@@ -135,7 +153,7 @@ class TestMaintenanceOp : public MaintenanceOp {
SleepFor(sleep_time_);
}
- virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE {
+ void UpdateStats(MaintenanceOpStats* stats) override {
std::lock_guard<Mutex> guard(lock_);
stats->set_runnable(remaining_runs_ > 0);
stats->set_ram_anchored(ram_anchored_);
@@ -168,14 +186,18 @@ class TestMaintenanceOp : public MaintenanceOp {
perf_improvement_ = perf_improvement;
}
- virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE {
+ scoped_refptr<Histogram> DurationHistogram() const override {
return maintenance_op_duration_;
}
- virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE {
+ scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const override {
return maintenance_ops_running_;
}
+ const std::string& table_id() const override {
+ return table_id_;
+ }
+
private:
Mutex lock_;
@@ -195,6 +217,7 @@ class TestMaintenanceOp : public MaintenanceOp {
// The amount of time each op invocation will sleep.
MonoDelta sleep_time_;
+ std::string table_id_;
};
// Create an op and wait for it to start running. Unregister it while it is
@@ -266,7 +289,7 @@ TEST_F(MaintenanceManagerTest, TestMemoryPressure) {
TEST_F(MaintenanceManagerTest, TestLogRetentionPrioritization) {
const int64_t kMB = 1024 * 1024;
- manager_->Shutdown();
+ StopManager();
TestMaintenanceOp op1("op1", MaintenanceOp::LOW_IO_USAGE);
op1.set_ram_anchored(0);
@@ -302,13 +325,13 @@ TEST_F(MaintenanceManagerTest,
TestLogRetentionPrioritization) {
FLAGS_log_target_replay_size_mb = 50;
op_and_why = manager_->FindBestOp();
ASSERT_EQ(&op3, op_and_why.first);
- EXPECT_EQ(op_and_why.second, "104857600 bytes log retention");
+ EXPECT_EQ(op_and_why.second, "104857600 bytes log retention, and flush 200
bytes memory");
manager_->UnregisterOp(&op3);
op_and_why = manager_->FindBestOp();
ASSERT_EQ(&op2, op_and_why.first);
- EXPECT_EQ(op_and_why.second, "104857600 bytes log retention");
+ EXPECT_EQ(op_and_why.second, "104857600 bytes log retention, and flush 100
bytes memory");
manager_->UnregisterOp(&op2);
}
@@ -341,10 +364,11 @@ TEST_F(MaintenanceManagerTest, TestRunningInstances) {
manager_->GetMaintenanceManagerStatusDump(&status_pb);
ASSERT_EQ(status_pb.running_operations_size(), 0);
}
+
// Test adding operations and make sure that the history of recently completed
operations
// is correct in that it wraps around and doesn't grow.
TEST_F(MaintenanceManagerTest, TestCompletedOpsHistory) {
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < kHistorySize + 1; i++) {
string name = Substitute("op$0", i);
TestMaintenanceOp op(name, MaintenanceOp::HIGH_IO_USAGE);
op.set_perf_improvement(1);
@@ -358,12 +382,123 @@ TEST_F(MaintenanceManagerTest, TestCompletedOpsHistory) {
MaintenanceManagerStatusPB status_pb;
manager_->GetMaintenanceManagerStatusDump(&status_pb);
- // The size should be at most the history_size.
- ASSERT_GE(kHistorySize, status_pb.completed_operations_size());
+ // The size should equal to the current completed OP size,
+ // and should be at most the kHistorySize.
+ ASSERT_EQ(std::min(kHistorySize, i + 1),
status_pb.completed_operations_size());
// The most recently completed op should always be first, even if we wrap
// around.
ASSERT_EQ(name, status_pb.completed_operations(0).name());
}
}
+// Test maintenance OP factors.
+// The OPs on different priority levels have different OP score multipliers.
+TEST_F(MaintenanceManagerTest, TestOpFactors) {
+ StopManager();
+
+ ASSERT_GE(FLAGS_max_priority_range, 1);
+ ASSERT_NE("", gflags::SetCommandLineOption(
+ "maintenance_manager_table_priorities",
+
Substitute("table_id_1:$0;table_id_2:$1;table_id_3:$2;table_id_4:$3;table_id_5:$4",
+ -FLAGS_max_priority_range - 1, -1, 0, 1,
FLAGS_max_priority_range + 1).c_str()));
+ TestMaintenanceOp op1("op1", MaintenanceOp::HIGH_IO_USAGE, "table_id_1");
+ TestMaintenanceOp op2("op2", MaintenanceOp::HIGH_IO_USAGE, "table_id_2");
+ TestMaintenanceOp op3("op3", MaintenanceOp::HIGH_IO_USAGE, "table_id_3");
+ TestMaintenanceOp op4("op4", MaintenanceOp::HIGH_IO_USAGE, "table_id_4");
+ TestMaintenanceOp op5("op5", MaintenanceOp::HIGH_IO_USAGE, "table_id_5");
+ TestMaintenanceOp op6("op6", MaintenanceOp::HIGH_IO_USAGE, "table_id_6");
+
+ manager_->UpdateTablePriorities();
+
+ ASSERT_DOUBLE_EQ(pow(FLAGS_maintenance_op_multiplier,
-FLAGS_max_priority_range),
+ manager_->PerfImprovement(1, op1.table_id()));
+ ASSERT_DOUBLE_EQ(pow(FLAGS_maintenance_op_multiplier, -1),
+ manager_->PerfImprovement(1, op2.table_id()));
+ ASSERT_DOUBLE_EQ(1, manager_->PerfImprovement(1, op3.table_id()));
+ ASSERT_DOUBLE_EQ(FLAGS_maintenance_op_multiplier,
manager_->PerfImprovement(1, op4.table_id()));
+ ASSERT_DOUBLE_EQ(pow(FLAGS_maintenance_op_multiplier,
FLAGS_max_priority_range),
+ manager_->PerfImprovement(1, op5.table_id()));
+ ASSERT_DOUBLE_EQ(1, manager_->PerfImprovement(1, op6.table_id()));
+}
+
+// Test priority OP launching.
+TEST_F(MaintenanceManagerTest, TestPriorityOpLaunch) {
+ StopManager();
+ StartManager(1);
+
+ ASSERT_NE("", gflags::SetCommandLineOption(
+ "maintenance_manager_table_priorities",
+
Substitute("table_id_1:$0;table_id_2:$1;table_id_3:$2;table_id_4:$3;table_id_5:$4",
+ -FLAGS_max_priority_range - 1, -1, 0, 1,
FLAGS_max_priority_range + 1).c_str()));
+
+ TestMaintenanceOp op1("op1", MaintenanceOp::HIGH_IO_USAGE, "table_id_1");
+ op1.set_perf_improvement(10);
+ op1.set_remaining_runs(1);
+ op1.set_sleep_time(MonoDelta::FromMilliseconds(1));
+
+ TestMaintenanceOp op2("op2", MaintenanceOp::HIGH_IO_USAGE, "table_id_2");
+ op2.set_perf_improvement(10);
+ op2.set_remaining_runs(1);
+ op2.set_sleep_time(MonoDelta::FromMilliseconds(1));
+
+ TestMaintenanceOp op3("op3", MaintenanceOp::HIGH_IO_USAGE, "table_id_3");
+ op3.set_perf_improvement(10);
+ op3.set_remaining_runs(1);
+ op3.set_sleep_time(MonoDelta::FromMilliseconds(1));
+
+ TestMaintenanceOp op4("op4", MaintenanceOp::HIGH_IO_USAGE, "table_id_4");
+ op4.set_perf_improvement(10);
+ op4.set_remaining_runs(1);
+ op4.set_sleep_time(MonoDelta::FromMilliseconds(1));
+
+ TestMaintenanceOp op5("op5", MaintenanceOp::HIGH_IO_USAGE, "table_id_5");
+ op5.set_perf_improvement(10);
+ op5.set_remaining_runs(1);
+ op5.set_sleep_time(MonoDelta::FromMilliseconds(1));
+
+ TestMaintenanceOp op6("op6", MaintenanceOp::HIGH_IO_USAGE, "table_id_6");
+ op6.set_perf_improvement(12);
+ op6.set_remaining_runs(1);
+ op6.set_sleep_time(MonoDelta::FromMilliseconds(1));
+
+ manager_->RegisterOp(&op1);
+ manager_->RegisterOp(&op2);
+ manager_->RegisterOp(&op3);
+ manager_->RegisterOp(&op4);
+ manager_->RegisterOp(&op5);
+ manager_->RegisterOp(&op6);
+
+ ASSERT_EVENTUALLY([&]() {
+ MaintenanceManagerStatusPB status_pb;
+ manager_->GetMaintenanceManagerStatusDump(&status_pb);
+ ASSERT_EQ(status_pb.completed_operations_size(), 6);
+ });
+
+ // Wait for instances to complete.
+ manager_->UnregisterOp(&op1);
+ manager_->UnregisterOp(&op2);
+ manager_->UnregisterOp(&op3);
+ manager_->UnregisterOp(&op4);
+ manager_->UnregisterOp(&op5);
+ manager_->UnregisterOp(&op6);
+
+ // Check that running instances are removed from collection after completion.
+ MaintenanceManagerStatusPB status_pb;
+ manager_->GetMaintenanceManagerStatusDump(&status_pb);
+ ASSERT_EQ(status_pb.running_operations_size(), 0);
+ ASSERT_EQ(status_pb.completed_operations_size(), 6);
+ // In perf_improvement score ascending order, the latter completed OP will
list former.
+ list<string> ordered_ops({"op1",
+ "op2",
+ "op3",
+ "op4",
+ "op6",
+ "op5"});
+ ASSERT_EQ(ordered_ops.size(), status_pb.completed_operations().size());
+ for (const auto& instance : status_pb.completed_operations()) {
+ ASSERT_EQ(ordered_ops.front(), instance.name());
+ ordered_ops.pop_front();
+ }
+}
+
} // namespace kudu
diff --git a/src/kudu/util/maintenance_manager.cc
b/src/kudu/util/maintenance_manager.cc
index ea607aa..f783717 100644
--- a/src/kudu/util/maintenance_manager.cc
+++ b/src/kudu/util/maintenance_manager.cc
@@ -17,8 +17,9 @@
#include "kudu/util/maintenance_manager.h"
+#include <algorithm>
#include <cinttypes>
-#include <cstddef>
+#include <cmath>
#include <cstdint>
#include <memory>
#include <mutex>
@@ -26,6 +27,7 @@
#include <string>
#include <type_traits>
#include <utility>
+#include <vector>
#include <boost/bind.hpp>
#include <gflags/gflags.h>
@@ -33,6 +35,8 @@
#include "kudu/gutil/dynamic_annotations.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/debug/trace_logging.h"
@@ -50,6 +54,8 @@
using std::pair;
using std::string;
+using std::vector;
+using strings::Split;
using strings::Substitute;
DEFINE_int32(maintenance_manager_num_threads, 1,
@@ -91,6 +97,29 @@ DEFINE_double(data_gc_prioritization_prob, 0.5,
"such as delta compaction.");
TAG_FLAG(data_gc_prioritization_prob, experimental);
+DEFINE_string(maintenance_manager_table_priorities, "",
+ "Priorities of tables, semicolon-separated list of
table-priority pairs, and each "
+ "table-priority pair is combined by table id, colon and priority
level. Priority "
+ "level is ranged in [-FLAGS_max_priority_range,
FLAGS_max_priority_range]");
+TAG_FLAG(maintenance_manager_table_priorities, advanced);
+TAG_FLAG(maintenance_manager_table_priorities, experimental);
+TAG_FLAG(maintenance_manager_table_priorities, runtime);
+
+DEFINE_double(maintenance_op_multiplier, 1.1,
+ "Multiplier applied on different priority levels, table
maintenance OPs on level N "
+ "has multiplier of FLAGS_maintenance_op_multiplier^N, the last
score will be "
+ "multiplied by this multiplier. Note: this multiplier is only
take effect on "
+ "compaction OPs");
+TAG_FLAG(maintenance_op_multiplier, advanced);
+TAG_FLAG(maintenance_op_multiplier, experimental);
+TAG_FLAG(maintenance_op_multiplier, runtime);
+
+DEFINE_int32(max_priority_range, 5,
+ "Maximal priority range of OPs.");
+TAG_FLAG(max_priority_range, advanced);
+TAG_FLAG(max_priority_range, experimental);
+TAG_FLAG(max_priority_range, runtime);
+
namespace kudu {
MaintenanceOpStats::MaintenanceOpStats() {
@@ -107,7 +136,7 @@ void MaintenanceOpStats::Clear() {
last_modified_ = MonoTime();
}
-MaintenanceOp::MaintenanceOp(std::string name, IOUsage io_usage)
+MaintenanceOp::MaintenanceOp(string name, IOUsage io_usage)
: name_(std::move(name)),
running_(0),
cancel_(false),
@@ -129,10 +158,10 @@ MaintenanceManagerStatusPB_OpInstancePB
OpInstance::DumpToPB() const {
pb.set_thread_id(thread_id);
pb.set_name(name);
if (duration.Initialized()) {
- pb.set_duration_millis(duration.ToMilliseconds());
+ pb.set_duration_millis(static_cast<int32_t>(duration.ToMilliseconds()));
}
MonoDelta delta(MonoTime::Now() - start_mono_time);
- pb.set_millis_since_start(delta.ToMilliseconds());
+ pb.set_millis_since_start(static_cast<int32_t>(delta.ToMilliseconds()));
return pb;
}
@@ -143,7 +172,7 @@ const MaintenanceManager::Options
MaintenanceManager::kDefaultOptions = {
};
MaintenanceManager::MaintenanceManager(const Options& options,
- std::string server_uuid)
+ string server_uuid)
: server_uuid_(std::move(server_uuid)),
num_threads_(options.num_threads <= 0 ?
FLAGS_maintenance_manager_num_threads : options.num_threads),
@@ -264,8 +293,7 @@ void MaintenanceManager::RunSchedulerThread() {
// 1) there are no free threads available to perform a maintenance op.
// or 2) we just tried to schedule an op but found nothing to run.
// However, if it's time to shut down, we want to do so immediately.
- while ((running_ops_ >= num_threads_ || prev_iter_found_no_work ||
disabled_for_tests()) &&
- !shutdown_) {
+ while (CouldNotLaunchNewOp(prev_iter_found_no_work)) {
cond_.WaitFor(polling_interval);
prev_iter_found_no_work = false;
}
@@ -274,42 +302,46 @@ void MaintenanceManager::RunSchedulerThread() {
return;
}
- // Find the best op.
- auto best_op_and_why = FindBestOp();
- auto* op = best_op_and_why.first;
- const auto& note = best_op_and_why.second;
+ // TODO(yingchun): move it to SetFlag, callback once as a gflags setter
handler.
+ UpdateTablePriorities();
// If we found no work to do, then we should sleep before trying again to
schedule.
// Otherwise, we can go right into trying to find the next op.
- prev_iter_found_no_work = (op == nullptr);
- if (!op) {
- VLOG_AND_TRACE_WITH_PREFIX("maintenance", 2)
- << "No maintenance operations look worth doing.";
- continue;
- }
+ prev_iter_found_no_work = !FindAndLaunchOp(&guard);
+ }
+}
- // Prepare the maintenance operation.
- op->running_++;
- running_ops_++;
- guard.unlock();
- bool ready = op->Prepare();
- guard.lock();
- if (!ready) {
- LOG_WITH_PREFIX(INFO) << "Prepare failed for " << op->name()
- << ". Re-running scheduler.";
- op->running_--;
- running_ops_--;
- op->cond_->Signal();
- continue;
- }
+bool MaintenanceManager::FindAndLaunchOp(std::unique_lock<Mutex>* guard) {
+ // Find the best op.
+ auto best_op_and_why = FindBestOp();
+ auto* op = best_op_and_why.first;
+ const auto& note = best_op_and_why.second;
- LOG_AND_TRACE_WITH_PREFIX("maintenance", INFO)
- << Substitute("Scheduling $0: $1", op->name(), note);
- // Run the maintenance operation.
- Status s = thread_pool_->SubmitFunc(boost::bind(
- &MaintenanceManager::LaunchOp, this, op));
- CHECK(s.ok());
+ if (!op) {
+ VLOG_AND_TRACE_WITH_PREFIX("maintenance", 2)
+ << "No maintenance operations look worth doing.";
+ return false;
}
+
+ // Prepare the maintenance operation.
+ IncreaseOpCount(op);
+ guard->unlock();
+ bool ready = op->Prepare();
+ guard->lock();
+ if (!ready) {
+ LOG_WITH_PREFIX(INFO) << "Prepare failed for " << op->name()
+ << ". Re-running scheduler.";
+ DecreaseOpCount(op);
+ op->cond_->Signal();
+ return true;
+ }
+
+ LOG_AND_TRACE_WITH_PREFIX("maintenance", INFO)
+ << Substitute("Scheduling $0: $1", op->name(), note);
+ // Run the maintenance operation.
+ CHECK_OK(thread_pool_->SubmitFunc(boost::bind(&MaintenanceManager::LaunchOp,
this, op)));
+
+ return true;
}
// Finding the best operation goes through four filters:
@@ -335,8 +367,7 @@ void MaintenanceManager::RunSchedulerThread() {
pair<MaintenanceOp*, string> MaintenanceManager::FindBestOp() {
TRACE_EVENT0("maintenance", "MaintenanceManager::FindBestOp");
- size_t free_threads = num_threads_ - running_ops_;
- if (free_threads == 0) {
+ if (!HasFreeThreads()) {
return {nullptr, "no free threads"};
}
@@ -348,7 +379,7 @@ pair<MaintenanceOp*, string>
MaintenanceManager::FindBestOp() {
int64_t most_logs_retained_bytes = 0;
int64_t most_logs_retained_bytes_ram_anchored = 0;
- MaintenanceOp* most_logs_retained_bytes_op = nullptr;
+ MaintenanceOp* most_logs_retained_bytes_ram_anchored_op = nullptr;
int64_t most_data_retained_bytes = 0;
MaintenanceOp* most_data_retained_bytes_op = nullptr;
@@ -367,8 +398,8 @@ pair<MaintenanceOp*, string>
MaintenanceManager::FindBestOp() {
}
const auto logs_retained_bytes = stats.logs_retained_bytes();
- if (logs_retained_bytes > low_io_most_logs_retained_bytes &&
- op->io_usage() == MaintenanceOp::LOW_IO_USAGE) {
+ if (op->io_usage() == MaintenanceOp::LOW_IO_USAGE &&
+ logs_retained_bytes > low_io_most_logs_retained_bytes) {
low_io_most_logs_retained_bytes_op = op;
low_io_most_logs_retained_bytes = logs_retained_bytes;
VLOG_AND_TRACE_WITH_PREFIX("maintenance", 2)
@@ -387,7 +418,7 @@ pair<MaintenanceOp*, string>
MaintenanceManager::FindBestOp() {
if (std::make_pair(logs_retained_bytes, ram_anchored) >
std::make_pair(most_logs_retained_bytes,
most_logs_retained_bytes_ram_anchored)) {
- most_logs_retained_bytes_op = op;
+ most_logs_retained_bytes_ram_anchored_op = op;
most_logs_retained_bytes = logs_retained_bytes;
most_logs_retained_bytes_ram_anchored = ram_anchored;
}
@@ -401,7 +432,7 @@ pair<MaintenanceOp*, string>
MaintenanceManager::FindBestOp() {
op->name(), data_retained_bytes);
}
- const auto perf_improvement = stats.perf_improvement();
+ const auto perf_improvement = PerfImprovement(stats.perf_improvement(),
op->table_id());
if ((!best_perf_improvement_op) ||
(perf_improvement > best_perf_improvement)) {
best_perf_improvement_op = op;
@@ -420,7 +451,7 @@ pair<MaintenanceOp*, string>
MaintenanceManager::FindBestOp() {
double capacity_pct;
if (memory_pressure_func_(&capacity_pct)) {
if (!most_ram_anchored_op) {
- std::string msg = StringPrintf("System under memory pressure "
+ string msg = StringPrintf("System under memory pressure "
"(%.2f%% of limit used). However, there are no ops currently "
"runnable which would free memory.", capacity_pct);
KLOG_EVERY_N_SECS(WARNING, 5) << msg;
@@ -432,10 +463,13 @@ pair<MaintenanceOp*, string>
MaintenanceManager::FindBestOp() {
return {most_ram_anchored_op, std::move(note)};
}
- if (most_logs_retained_bytes_op &&
+ // Look at ops that free up more log retention, and also free up more memory.
+ if (most_logs_retained_bytes_ram_anchored_op &&
most_logs_retained_bytes / 1024 / 1024 >=
FLAGS_log_target_replay_size_mb) {
- string note = Substitute("$0 bytes log retention",
most_logs_retained_bytes);
- return {most_logs_retained_bytes_op, std::move(note)};
+ string note = Substitute("$0 bytes log retention, and flush $1 bytes
memory",
+ most_logs_retained_bytes,
+ most_logs_retained_bytes_ram_anchored);
+ return {most_logs_retained_bytes_ram_anchored_op, std::move(note)};
}
// Look at ops that we can run quickly that free up data on disk.
@@ -449,6 +483,7 @@ pair<MaintenanceOp*, string>
MaintenanceManager::FindBestOp() {
VLOG(1) << "Skipping data GC due to prioritizing perf improvement";
}
+ // Look at ops that can improve read/write performance most.
if (best_perf_improvement_op && best_perf_improvement > 0) {
string note = StringPrintf("perf score=%.6f", best_perf_improvement);
return {best_perf_improvement_op, std::move(note)};
@@ -456,6 +491,16 @@ pair<MaintenanceOp*, string>
MaintenanceManager::FindBestOp() {
return {nullptr, "no ops with positive improvement"};
}
+double MaintenanceManager::PerfImprovement(double perf_improvement,
+ const string& table_id) const {
+ int32_t priority = 0;
+ if (!FindCopy(table_priorities_, table_id, &priority)) {
+ return perf_improvement;
+ }
+
+ return perf_improvement * std::pow(FLAGS_maintenance_op_multiplier,
priority);
+}
+
void MaintenanceManager::LaunchOp(MaintenanceOp* op) {
int64_t thread_id = Thread::CurrentThreadId();
OpInstance op_instance;
@@ -481,9 +526,7 @@ void MaintenanceManager::LaunchOp(MaintenanceOp* op) {
completed_ops_count_++;
op->DurationHistogram()->Increment(op_instance.duration.ToMilliseconds());
-
- running_ops_--;
- op->running_--;
+ DecreaseOpCount(op);
op->cond_->Signal();
cond_.Signal(); // Wake up scheduler.
});
@@ -507,9 +550,6 @@ void MaintenanceManager::LaunchOp(MaintenanceOp* op) {
void
MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB*
out_pb) {
DCHECK(out_pb != nullptr);
std::lock_guard<Mutex> guard(lock_);
- auto best_op_and_why = FindBestOp();
- auto* best_op = best_op_and_why.first;
-
for (const auto& val : ops_) {
MaintenanceManagerStatusPB_MaintenanceOpPB* op_pb =
out_pb->add_registered_operations();
MaintenanceOp* op(val.first);
@@ -527,10 +567,6 @@ void
MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatu
op_pb->set_logs_retained_bytes(0);
op_pb->set_perf_improvement(0.0);
}
-
- if (best_op == op) {
- out_pb->mutable_best_op()->CopyFrom(*op_pb);
- }
}
{
@@ -540,8 +576,9 @@ void
MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatu
}
}
+ // The latest completed op will be dumped at first.
for (int n = 1; n <= completed_ops_.size(); n++) {
- int i = completed_ops_count_ - n;
+ int64_t i = completed_ops_count_ - n;
if (i < 0) break;
const auto& completed_op = completed_ops_[i % completed_ops_.size()];
@@ -551,8 +588,48 @@ void
MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatu
}
}
-std::string MaintenanceManager::LogPrefix() const {
+string MaintenanceManager::LogPrefix() const {
return Substitute("P $0: ", server_uuid_);
}
+bool MaintenanceManager::HasFreeThreads() {
+ return num_threads_ - running_ops_ > 0;
+}
+
+bool MaintenanceManager::CouldNotLaunchNewOp(bool prev_iter_found_no_work) {
+ return (!HasFreeThreads() || prev_iter_found_no_work ||
disabled_for_tests()) && !shutdown_;
+}
+
+void MaintenanceManager::UpdateTablePriorities() {
+ string table_priorities_str;
+ CHECK(google::GetCommandLineOption("maintenance_manager_table_priorities",
+ &table_priorities_str));
+
+ TablePriorities table_priorities;
+ int32_t value;
+ for (auto table_priority_str : Split(table_priorities_str, ";",
strings::SkipEmpty())) {
+ vector<string> table_priority = Split(table_priority_str, ":",
strings::SkipEmpty());
+ if (safe_strto32_base(table_priority[1].c_str(), &value, 10)) {
+ value = std::max(value, -FLAGS_max_priority_range);
+ value = std::min(value, FLAGS_max_priority_range);
+ table_priorities[table_priority[0]] = value;
+ } else {
+ LOG(WARNING) << "Some error occured when parse flag
maintenance_manager_table_priorities: "
+ << table_priorities_str;
+ return;
+ }
+ }
+ table_priorities_.swap(table_priorities);
+}
+
+void MaintenanceManager::IncreaseOpCount(MaintenanceOp *op) {
+ op->running_++;
+ running_ops_++;
+}
+
+void MaintenanceManager::DecreaseOpCount(MaintenanceOp *op) {
+ op->running_--;
+ running_ops_--;
+}
+
} // namespace kudu
diff --git a/src/kudu/util/maintenance_manager.h
b/src/kudu/util/maintenance_manager.h
index c5dd8ac..a4e6b96 100644
--- a/src/kudu/util/maintenance_manager.h
+++ b/src/kudu/util/maintenance_manager.h
@@ -229,6 +229,9 @@ class MaintenanceOp {
cancel_.Store(true);
}
+ protected:
+ virtual const std::string& table_id() const = 0;
+
private:
DISALLOW_COPY_AND_ASSIGN(MaintenanceOp);
@@ -302,8 +305,11 @@ class MaintenanceManager : public
std::enable_shared_from_this<MaintenanceManage
private:
FRIEND_TEST(MaintenanceManagerTest, TestLogRetentionPrioritization);
+ FRIEND_TEST(MaintenanceManagerTest, TestOpFactors);
+
typedef std::map<MaintenanceOp*, MaintenanceOpStats,
MaintenanceOpComparator> OpMapTy;
+ typedef std::unordered_map<std::string, int32_t> TablePriorities;
// Return true if tests have currently disabled the maintenance
// manager by way of changing the gflags at runtime.
@@ -311,17 +317,32 @@ class MaintenanceManager : public
std::enable_shared_from_this<MaintenanceManage
void RunSchedulerThread();
+ bool FindAndLaunchOp(std::unique_lock<Mutex>* guard);
+
// Find the best op, or null if there is nothing we want to run.
//
// Returns the op, as well as a string explanation of why that op was chosen,
// suitable for logging.
std::pair<MaintenanceOp*, std::string> FindBestOp();
+ double PerfImprovement(double perf_improvement,
+ const std::string& table_id) const;
+
void LaunchOp(MaintenanceOp* op);
std::string LogPrefix() const;
+ bool HasFreeThreads();
+
+ bool CouldNotLaunchNewOp(bool prev_iter_found_no_work);
+
+ void UpdateTablePriorities();
+
+ void IncreaseOpCount(MaintenanceOp *op);
+ void DecreaseOpCount(MaintenanceOp *op);
+
const std::string server_uuid_;
+ TablePriorities table_priorities_;
const int32_t num_threads_;
OpMapTy ops_; // Registered operations.
Mutex lock_;
@@ -330,7 +351,7 @@ class MaintenanceManager : public
std::enable_shared_from_this<MaintenanceManage
ConditionVariable cond_;
bool shutdown_;
int32_t polling_interval_ms_;
- uint64_t running_ops_;
+ int32_t running_ops_;
// Vector used as a circular buffer for recently completed ops. Elements
need to be added at
// the completed_ops_count_ % the vector's size and then the count needs to
be incremented.
std::vector<OpInstance> completed_ops_;
diff --git a/src/kudu/util/maintenance_manager.proto
b/src/kudu/util/maintenance_manager.proto
index b6b1203..77c8cf8 100644
--- a/src/kudu/util/maintenance_manager.proto
+++ b/src/kudu/util/maintenance_manager.proto
@@ -26,7 +26,7 @@ message MaintenanceManagerStatusPB {
// Number of times this operation is currently running.
required uint32 running = 2;
required bool runnable = 3;
- required uint64 ram_anchored_bytes = 4;
+ required int64 ram_anchored_bytes = 4;
required int64 logs_retained_bytes = 5;
required double perf_improvement = 6;
}
@@ -40,15 +40,12 @@ message MaintenanceManagerStatusPB {
required int32 millis_since_start = 4;
}
- // The next operation that would run.
- optional MaintenanceOpPB best_op = 1;
-
// List of all the operations.
- repeated MaintenanceOpPB registered_operations = 2;
+ repeated MaintenanceOpPB registered_operations = 1;
// This list isn't in order of anything. Can contain the same operation
multiple times.
- repeated OpInstancePB running_operations = 3;
+ repeated OpInstancePB running_operations = 2;
// This list isn't in order of anything. Can contain the same operation
multiple times.
- repeated OpInstancePB completed_operations = 4;
+ repeated OpInstancePB completed_operations = 3;
}