This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 5e222ecc27a branch-3.1: [Enhancement](Compaction) Support auto set
compaction task num per round #53408 (#53775)
5e222ecc27a is described below
commit 5e222ecc27ae615872738e1a79b4c23990c701ab
Author: abmdocrt <[email protected]>
AuthorDate: Fri Aug 1 10:10:09 2025 +0800
branch-3.1: [Enhancement](Compaction) Support auto set compaction task num
per round #53408 (#53775)
Pick #53408
---
be/src/common/config.cpp | 7 +-
be/src/common/config.h | 1 +
be/src/olap/olap_server.cpp | 32 +++++++
be/src/olap/storage_engine.h | 4 +
be/src/olap/tablet_manager.cpp | 14 ++--
be/test/olap/compaction_task_test.cpp | 153 ++++++++++++++++++++++++++++++++++
be/test/olap/tablet_mgr_test.cpp | 19 +++--
7 files changed, 214 insertions(+), 16 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index b3f56b285db..252600723e5 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1508,7 +1508,12 @@ DEFINE_Bool(enable_table_size_correctness_check,
"false");
DEFINE_Bool(force_regenerate_rowsetid_on_start_error, "false");
DEFINE_mBool(enable_sleep_between_delete_cumu_compaction, "false");
-DEFINE_mInt32(compaction_num_per_round, "4");
+// The number of compaction tasks generated each time.
+// -1 means automatic number, other values mean fixed number.
+DEFINE_mInt32(compaction_num_per_round, "-1");
+// Max automatic compaction task generated num per round.
+// Only valid if "compaction_num_per_round = 0"
+DEFINE_mInt32(max_automatic_compaction_num_per_round, "64");
DEFINE_mInt32(check_tablet_delete_bitmap_interval_seconds, "300");
DEFINE_mInt32(check_tablet_delete_bitmap_score_top_n, "10");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 1ee4b9d6821..2a9ba7bd293 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1582,6 +1582,7 @@ DECLARE_Bool(enable_table_size_correctness_check);
DECLARE_mBool(enable_sleep_between_delete_cumu_compaction);
DECLARE_mInt32(compaction_num_per_round);
+DECLARE_mInt32(max_automatic_compaction_num_per_round);
DECLARE_mInt32(check_tablet_delete_bitmap_interval_seconds);
DECLARE_mInt32(check_tablet_delete_bitmap_score_top_n);
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index b4f56216c67..f3aca7f09d2 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -98,6 +98,8 @@ using io::Path;
// number of running SCHEMA-CHANGE threads
volatile uint32_t g_schema_change_active_threads = 0;
+bvar::Status<int64_t>
g_cumu_compaction_task_num_per_round("cumu_compaction_task_num_per_round", 0);
+bvar::Status<int64_t>
g_base_compaction_task_num_per_round("base_compaction_task_num_per_round", 0);
static const uint64_t DEFAULT_SEED = 104729;
static const uint64_t MOD_PRIME = 7652413;
@@ -584,6 +586,7 @@ void StorageEngine::_tablet_path_check_callback() {
}
void StorageEngine::_adjust_compaction_thread_num() {
+
TEST_SYNC_POINT_RETURN_WITH_VOID("StorageEngine::_adjust_compaction_thread_num.return_void");
auto base_compaction_threads_num =
get_base_compaction_threads_num(_store_map.size());
if (_base_compaction_thread_pool->max_threads() !=
base_compaction_threads_num) {
int old_max_threads = _base_compaction_thread_pool->max_threads();
@@ -687,6 +690,33 @@ void StorageEngine::_compaction_tasks_producer_callback() {
last_base_score_update_time = cur_time;
}
}
+ std::unique_ptr<ThreadPool>& thread_pool =
+ (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
+ ? _cumu_compaction_thread_pool
+ : _base_compaction_thread_pool;
+ bvar::Status<int64_t>& g_compaction_task_num_per_round =
+ (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
+ ? g_cumu_compaction_task_num_per_round
+ : g_base_compaction_task_num_per_round;
+ if (config::compaction_num_per_round != -1) {
+ _compaction_num_per_round = config::compaction_num_per_round;
+ } else if (thread_pool->get_queue_size() == 0) {
+ // If all tasks in the thread pool queue are executed,
+ // double the number of tasks generated each time,
+ // with a maximum of
config::max_automatic_compaction_num_per_round tasks per generation.
+ if (_compaction_num_per_round <
config::max_automatic_compaction_num_per_round) {
+ _compaction_num_per_round *= 2;
+
g_compaction_task_num_per_round.set_value(_compaction_num_per_round);
+ }
+ } else if (thread_pool->get_queue_size() >
_compaction_num_per_round / 2) {
+ // If all tasks in the thread pool is greater than
+ // half of the tasks submitted in the previous round,
+ // reduce the number of tasks generated each time by half,
with a minimum of 1.
+ if (_compaction_num_per_round > 1) {
+ _compaction_num_per_round /= 2;
+
g_compaction_task_num_per_round.set_value(_compaction_num_per_round);
+ }
+ }
std::vector<TabletSharedPtr> tablets_compaction =
_generate_compaction_tasks(compaction_type, data_dirs,
check_score);
if (tablets_compaction.size() == 0) {
@@ -946,6 +976,8 @@ bool has_free_compaction_slot(CompactionSubmitRegistry*
registry, DataDir* dir,
std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks(
CompactionType compaction_type, std::vector<DataDir*>& data_dirs, bool
check_score) {
+
TEST_SYNC_POINT_RETURN_WITH_VALUE("olap_server::_generate_compaction_tasks.return_empty",
+ std::vector<TabletSharedPtr> {});
_update_cumulative_compaction_policy();
std::vector<TabletSharedPtr> tablets_compaction;
uint32_t max_compaction_score = 0;
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index c5b9b193162..ebc6a2ede3d 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -350,6 +350,8 @@ public:
std::unordered_map<int64_t, std::unique_ptr<TaskWorkerPoolIf>>* workers;
+ int64_t get_compaction_num_per_round() const { return
_compaction_num_per_round; }
+
private:
// Instance should be inited from `static open()`
// MUST NOT be called in other circumstances.
@@ -573,6 +575,8 @@ private:
scoped_refptr<Thread> _check_delete_bitmap_score_thread;
int64_t _last_get_peers_replica_backends_time_ms {0};
+
+ int64_t _compaction_num_per_round {1};
};
// lru cache for create tabelt round robin in disks
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 9251791d601..7e40b0e4cbe 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -743,6 +743,8 @@ std::vector<TabletSharedPtr>
TabletManager::find_best_tablets_to_compaction(
uint32_t single_compact_highest_score = 0;
TabletSharedPtr best_tablet;
TabletSharedPtr best_single_compact_tablet;
+ int64_t compaction_num_per_round =
+
ExecEnv::GetInstance()->storage_engine().to_local().get_compaction_num_per_round();
auto cmp = [](TabletScore left, TabletScore right) { return left.score >
right.score; };
std::priority_queue<TabletScore, std::vector<TabletScore>, decltype(cmp)>
top_tablets(cmp);
@@ -809,20 +811,18 @@ std::vector<TabletSharedPtr>
TabletManager::find_best_tablets_to_compaction(
best_single_compact_tablet = tablet_ptr;
}
- if (config::compaction_num_per_round > 1 &&
!tablet_ptr->should_fetch_from_peer()) {
+ if (compaction_num_per_round > 1 &&
!tablet_ptr->should_fetch_from_peer()) {
TabletScore ts;
ts.score = current_compaction_score;
ts.tablet_ptr = tablet_ptr;
- if ((top_tablets.size() >= config::compaction_num_per_round &&
+ if ((top_tablets.size() >= compaction_num_per_round &&
current_compaction_score > top_tablets.top().score) ||
- top_tablets.size() < config::compaction_num_per_round) {
+ top_tablets.size() < compaction_num_per_round) {
top_tablets.push(ts);
- if (top_tablets.size() > config::compaction_num_per_round) {
+ if (top_tablets.size() > compaction_num_per_round) {
top_tablets.pop();
}
- if (current_compaction_score > highest_score) {
- highest_score = current_compaction_score;
- }
+ highest_score = std::max(current_compaction_score,
highest_score);
}
} else {
if (current_compaction_score > highest_score &&
!tablet_ptr->should_fetch_from_peer()) {
diff --git a/be/test/olap/compaction_task_test.cpp
b/be/test/olap/compaction_task_test.cpp
index c00406e6d15..d70d4742362 100644
--- a/be/test/olap/compaction_task_test.cpp
+++ b/be/test/olap/compaction_task_test.cpp
@@ -130,4 +130,157 @@ TEST_F(CompactionTaskTest, TestSubmitCompactionTask) {
EXPECT_EQ(executing_task_num, 2);
}
+TEST_F(CompactionTaskTest, TestAutoSetCompactionIncreaseTaskNum) {
+ auto st = ThreadPoolBuilder("BaseCompactionTaskThreadPool")
+ .set_min_threads(2)
+ .set_max_threads(2)
+ .build(&_storage_engine->_base_compaction_thread_pool);
+ EXPECT_TRUE(st.ok());
+ st = ThreadPoolBuilder("CumuCompactionTaskThreadPool")
+ .set_min_threads(2)
+ .set_max_threads(2)
+ .build(&_storage_engine->_cumu_compaction_thread_pool);
+ EXPECT_TRUE(st.ok());
+ config::disable_auto_compaction = false;
+
+ auto* sp = SyncPoint::get_instance();
+ sp->enable_processing();
+ sp->set_call_back("olap_server::_generate_compaction_tasks.return_empty",
[](auto&& values) {
+ auto* ret = try_any_cast_ret<std::vector<TabletSharedPtr>>(values);
+ ret->second = true;
+ });
+
sp->set_call_back("StorageEngine::_adjust_compaction_thread_num.return_void",
+ [](auto&& args) { *try_any_cast<bool*>(args.back()) =
true; });
+ sp->set_call_back("StorageEngine::_compaction_tasks_producer_callback",
+ [](auto&& values) {
std::this_thread::sleep_for(std::chrono::seconds(1)); });
+
+ Defer defer {[&]() {
+ _storage_engine->_stop_background_threads_latch.count_down();
+ sp->clear_all_call_backs();
+ }};
+
+ config::generate_compaction_tasks_interval_ms = 1000;
+ {
+ // queue size 1
+ // task num 1->1
+ _storage_engine->_cumu_compaction_thread_pool->_total_queued_tasks = 1;
+ // compaction tasks producer thread
+ st = Thread::create(
+ "StorageEngine", "compaction_tasks_producer_thread",
+ [this]() {
this->_storage_engine->_compaction_tasks_producer_callback(); },
+ &_storage_engine->_compaction_tasks_producer_thread);
+ EXPECT_TRUE(st.ok());
+ _storage_engine->_stop_background_threads_latch.count_down();
+ sleep(2);
+ EXPECT_EQ(_storage_engine->get_compaction_num_per_round(), 1);
+ }
+ {
+ // queue size 0
+ // task num 4->8
+ _storage_engine->_cumu_compaction_thread_pool->_total_queued_tasks = 0;
+ _storage_engine->_compaction_num_per_round = 4;
+ // compaction tasks producer thread
+ st = Thread::create(
+ "StorageEngine", "compaction_tasks_producer_thread",
+ [this]() {
this->_storage_engine->_compaction_tasks_producer_callback(); },
+ &_storage_engine->_compaction_tasks_producer_thread);
+ EXPECT_TRUE(st.ok());
+ _storage_engine->_stop_background_threads_latch.count_down();
+ sleep(2);
+ EXPECT_EQ(_storage_engine->get_compaction_num_per_round(), 8);
+ }
+ {
+ // queue size 0
+ // task num 64->64
+ _storage_engine->_cumu_compaction_thread_pool->_total_queued_tasks = 0;
+ _storage_engine->_compaction_num_per_round = 64;
+ // compaction tasks producer thread
+ st = Thread::create(
+ "StorageEngine", "compaction_tasks_producer_thread",
+ [this]() {
this->_storage_engine->_compaction_tasks_producer_callback(); },
+ &_storage_engine->_compaction_tasks_producer_thread);
+ EXPECT_TRUE(st.ok());
+ _storage_engine->_stop_background_threads_latch.count_down();
+ sleep(2);
+ EXPECT_EQ(_storage_engine->get_compaction_num_per_round(), 64);
+ }
+}
+
+TEST_F(CompactionTaskTest, TestAutoSetCompactionDecreaseTaskNum) {
+ auto st = ThreadPoolBuilder("BaseCompactionTaskThreadPool")
+ .set_min_threads(2)
+ .set_max_threads(2)
+ .build(&_storage_engine->_base_compaction_thread_pool);
+ EXPECT_TRUE(st.ok());
+ st = ThreadPoolBuilder("CumuCompactionTaskThreadPool")
+ .set_min_threads(2)
+ .set_max_threads(2)
+ .build(&_storage_engine->_cumu_compaction_thread_pool);
+ EXPECT_TRUE(st.ok());
+ config::disable_auto_compaction = false;
+
+ auto* sp = SyncPoint::get_instance();
+ sp->enable_processing();
+ sp->set_call_back("olap_server::_generate_compaction_tasks.return_empty",
[](auto&& values) {
+ auto* ret = try_any_cast_ret<std::vector<TabletSharedPtr>>(values);
+ ret->second = true;
+ });
+
sp->set_call_back("StorageEngine::_adjust_compaction_thread_num.return_void",
+ [](auto&& args) { *try_any_cast<bool*>(args.back()) =
true; });
+ sp->set_call_back("StorageEngine::_compaction_tasks_producer_callback",
+ [](auto&& values) {
std::this_thread::sleep_for(std::chrono::seconds(1)); });
+
+ Defer defer {[&]() {
+ _storage_engine->_stop_background_threads_latch.count_down();
+ sp->clear_all_call_backs();
+ }};
+
+ config::generate_compaction_tasks_interval_ms = 1000;
+ {
+ // queue size 3
+ // task num 8->8
+ _storage_engine->_compaction_num_per_round = 8;
+ _storage_engine->_cumu_compaction_thread_pool->_total_queued_tasks = 3;
+ // compaction tasks producer thread
+ st = Thread::create(
+ "StorageEngine", "compaction_tasks_producer_thread",
+ [this]() {
this->_storage_engine->_compaction_tasks_producer_callback(); },
+ &_storage_engine->_compaction_tasks_producer_thread);
+ EXPECT_TRUE(st.ok());
+ _storage_engine->_stop_background_threads_latch.count_down();
+ sleep(2);
+ EXPECT_EQ(_storage_engine->get_compaction_num_per_round(), 8);
+ }
+ {
+ // queue size 5
+ // task num 8->4
+ _storage_engine->_cumu_compaction_thread_pool->_total_queued_tasks = 5;
+ _storage_engine->_compaction_num_per_round = 8;
+ // compaction tasks producer thread
+ st = Thread::create(
+ "StorageEngine", "compaction_tasks_producer_thread",
+ [this]() {
this->_storage_engine->_compaction_tasks_producer_callback(); },
+ &_storage_engine->_compaction_tasks_producer_thread);
+ EXPECT_TRUE(st.ok());
+ _storage_engine->_stop_background_threads_latch.count_down();
+ sleep(2);
+ EXPECT_EQ(_storage_engine->get_compaction_num_per_round(), 4);
+ }
+ {
+ // queue size 1
+ // task num 1->1
+ _storage_engine->_cumu_compaction_thread_pool->_total_queued_tasks = 1;
+ _storage_engine->_compaction_num_per_round = 1;
+ // compaction tasks producer thread
+ st = Thread::create(
+ "StorageEngine", "compaction_tasks_producer_thread",
+ [this]() {
this->_storage_engine->_compaction_tasks_producer_callback(); },
+ &_storage_engine->_compaction_tasks_producer_thread);
+ EXPECT_TRUE(st.ok());
+ _storage_engine->_stop_background_threads_latch.count_down();
+ sleep(2);
+ EXPECT_EQ(_storage_engine->get_compaction_num_per_round(), 1);
+ }
+}
+
} // namespace doris
diff --git a/be/test/olap/tablet_mgr_test.cpp b/be/test/olap/tablet_mgr_test.cpp
index 7b940ffa0fa..4096c9ed86e 100644
--- a/be/test/olap/tablet_mgr_test.cpp
+++ b/be/test/olap/tablet_mgr_test.cpp
@@ -73,7 +73,9 @@ public:
EngineOptions options;
// won't open engine, options.path is needless
options.backend_uid = UniqueId::gen_uid();
- k_engine = std::make_unique<StorageEngine>(options);
+ auto engine = std::make_unique<StorageEngine>(options);
+ ExecEnv::GetInstance()->set_storage_engine(std::move(engine));
+ k_engine = &ExecEnv::GetInstance()->storage_engine().to_local();
_data_dir = new DataDir(*k_engine, _engine_data_path, 1000000000);
static_cast<void>(_data_dir->init());
_tablet_mgr = k_engine->tablet_manager();
@@ -82,10 +84,11 @@ public:
virtual void TearDown() {
SAFE_DELETE(_data_dir);
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_engine_data_path).ok());
+ ExecEnv::GetInstance()->set_storage_engine(nullptr);
_tablet_mgr = nullptr;
config::compaction_num_per_round = 1;
}
- std::unique_ptr<StorageEngine> k_engine;
+ StorageEngine* k_engine;
private:
DataDir* _data_dir = nullptr;
@@ -470,7 +473,7 @@ TEST_F(TabletMgrTest, FindTabletWithCompact) {
}
{
- config::compaction_num_per_round = 10;
+ k_engine->_compaction_num_per_round = 10;
for (int64_t i = 1; i <= 100; ++i) {
create_tablet(10000 + i, false, i);
}
@@ -488,7 +491,7 @@ TEST_F(TabletMgrTest, FindTabletWithCompact) {
100 - index - 1);
index++;
}
- config::compaction_num_per_round = 1;
+ k_engine->_compaction_num_per_round = 1;
// drop all tablets
for (int64_t id = 10001; id <= 10100; ++id) {
Status drop_st = _tablet_mgr->drop_tablet(id, id * 10, false);
@@ -497,7 +500,7 @@ TEST_F(TabletMgrTest, FindTabletWithCompact) {
}
{
- config::compaction_num_per_round = 10;
+ k_engine->_compaction_num_per_round = 10;
for (int64_t i = 1; i <= 100; ++i) {
create_tablet(20000 + i, false, i);
}
@@ -520,7 +523,7 @@ TEST_F(TabletMgrTest, FindTabletWithCompact) {
cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY]),
200 - 1);
- config::compaction_num_per_round = 1;
+ k_engine->_compaction_num_per_round = 1;
// drop all tablets
for (int64_t id = 20001; id <= 20100; ++id) {
Status drop_st = _tablet_mgr->drop_tablet(id, id * 10, false);
@@ -532,7 +535,7 @@ TEST_F(TabletMgrTest, FindTabletWithCompact) {
}
{
- config::compaction_num_per_round = 10;
+ k_engine->_compaction_num_per_round = 10;
for (int64_t i = 1; i <= 5; ++i) {
create_tablet(30000 + i, false, i + 5);
}
@@ -549,7 +552,7 @@ TEST_F(TabletMgrTest, FindTabletWithCompact) {
10 - i - 1);
}
- config::compaction_num_per_round = 1;
+ k_engine->_compaction_num_per_round = 1;
// drop all tablets
for (int64_t id = 30001; id <= 30005; ++id) {
Status drop_st = _tablet_mgr->drop_tablet(id, id * 10, false);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]