This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 85d895abf9d [Cherry-Pick](branch-3.0) Pick "[Enhancement](Compaction)
Support auto set compaction task num per round #53408" (#53776)
85d895abf9d is described below
commit 85d895abf9dcfe7480474326f4228b0dab28ca71
Author: abmdocrt <[email protected]>
AuthorDate: Sun Aug 24 15:19:48 2025 +0800
[Cherry-Pick](branch-3.0) Pick "[Enhancement](Compaction) Support auto set
compaction task num per round #53408" (#53776)
Pick #53408
English:
This PR implements adaptive generation of compaction task counts.
Before this PR: By default, compaction tasks are generated every 100ms,
producing the top-n tasks with the highest compaction scores each time,
where n is controlled by the compaction_num_per_round parameter.
After this PR: Compaction tasks are still generated every 100ms by
default. When compaction_num_per_round is not -1, the number of top-n
tasks generated each time is still controlled by this parameter.
However, when compaction_num_per_round is set to -1, n is no longer
controlled by compaction_num_per_round but becomes adaptive.
Implementation details:
Initially, 1 task is generated
Before generating subsequent tasks, the system checks whether the
compaction thread pool queue has been completely consumed (i.e., queue
size is 0) If the queue size is 0, it indicates that task generation is
too slow, so the number of tasks generated each time is doubled (maximum
of 64) If the queue size is greater than half the number of tasks
generated per round, it indicates that task generation is too fast, so
the number of tasks generated each time is halved (minimum of 1)
Summary: This implementation achieves adaptive compaction task count
generation.
Chinese:
这个pr实现了每次产生compaction task数目的自适应。
在这个pr之前:默认每100ms产生一次compaction task,每次产生compaction
score最高的topn个任务,任务数n由compaction_num_per_round参数控制。
这个pr之后:依旧默认每100ms产生一次compaction
task,当compaction_num_per_round不为-1时,每次产生topn任务的个数依旧由其控制;当compaction_num_per_round为-1时,n不再由compaction_num_per_round控制而是可以自适应,实现原理:第一次产生1个任务,后续每次产生任务之前会判断compaction线程池的队列任务是否完全被消耗,即是否queue的size是0,如果是,说明产生任务的速度不够,将每次产生任务的个数翻倍,最大为64个;如果queue的size大于每次产生的任务数的一半,说明产生任务的速度过大,这时需要将每次产生任务变为原来的一半,最小为1。
综上所述,这样就实现了compaction task数目的自适应。
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/common/config.cpp | 7 +-
be/src/common/config.h | 1 +
be/src/olap/olap_server.cpp | 32 +++++++
be/src/olap/storage_engine.h | 4 +
be/src/olap/tablet_manager.cpp | 14 ++--
be/test/olap/compaction_task_test.cpp | 153 ++++++++++++++++++++++++++++++++++
be/test/olap/tablet_mgr_test.cpp | 19 +++--
7 files changed, 214 insertions(+), 16 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 0989fd38f18..34b68cfe021 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1501,7 +1501,12 @@ DEFINE_Bool(enable_table_size_correctness_check,
"false");
DEFINE_Bool(force_regenerate_rowsetid_on_start_error, "false");
DEFINE_mBool(enable_sleep_between_delete_cumu_compaction, "false");
-DEFINE_mInt32(compaction_num_per_round, "4");
+// The number of compaction tasks generated each time.
+// -1 means automatic number, other values mean fixed number.
+DEFINE_mInt32(compaction_num_per_round, "-1");
+// Max automatic compaction task generated num per round.
+// Only valid if "compaction_num_per_round = 0"
+DEFINE_mInt32(max_automatic_compaction_num_per_round, "64");
DEFINE_mInt32(check_tablet_delete_bitmap_interval_seconds, "300");
DEFINE_mInt32(check_tablet_delete_bitmap_score_top_n, "10");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index a36f185a2c2..cc324c11b6b 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1571,6 +1571,7 @@ DECLARE_Bool(enable_table_size_correctness_check);
DECLARE_mBool(enable_sleep_between_delete_cumu_compaction);
DECLARE_mInt32(compaction_num_per_round);
+DECLARE_mInt32(max_automatic_compaction_num_per_round);
DECLARE_mInt32(check_tablet_delete_bitmap_interval_seconds);
DECLARE_mInt32(check_tablet_delete_bitmap_score_top_n);
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 36971ba5e9b..fe441522726 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -98,6 +98,8 @@ using io::Path;
// number of running SCHEMA-CHANGE threads
volatile uint32_t g_schema_change_active_threads = 0;
+bvar::Status<int64_t>
g_cumu_compaction_task_num_per_round("cumu_compaction_task_num_per_round", 0);
+bvar::Status<int64_t>
g_base_compaction_task_num_per_round("base_compaction_task_num_per_round", 0);
static const uint64_t DEFAULT_SEED = 104729;
static const uint64_t MOD_PRIME = 7652413;
@@ -584,6 +586,7 @@ void StorageEngine::_tablet_path_check_callback() {
}
void StorageEngine::_adjust_compaction_thread_num() {
+
TEST_SYNC_POINT_RETURN_WITH_VOID("StorageEngine::_adjust_compaction_thread_num.return_void");
auto base_compaction_threads_num =
get_base_compaction_threads_num(_store_map.size());
if (_base_compaction_thread_pool->max_threads() !=
base_compaction_threads_num) {
int old_max_threads = _base_compaction_thread_pool->max_threads();
@@ -687,6 +690,33 @@ void StorageEngine::_compaction_tasks_producer_callback() {
last_base_score_update_time = cur_time;
}
}
+ std::unique_ptr<ThreadPool>& thread_pool =
+ (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
+ ? _cumu_compaction_thread_pool
+ : _base_compaction_thread_pool;
+ bvar::Status<int64_t>& g_compaction_task_num_per_round =
+ (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
+ ? g_cumu_compaction_task_num_per_round
+ : g_base_compaction_task_num_per_round;
+ if (config::compaction_num_per_round != -1) {
+ _compaction_num_per_round = config::compaction_num_per_round;
+ } else if (thread_pool->get_queue_size() == 0) {
+ // If all tasks in the thread pool queue are executed,
+ // double the number of tasks generated each time,
+ // with a maximum of
config::max_automatic_compaction_num_per_round tasks per generation.
+ if (_compaction_num_per_round <
config::max_automatic_compaction_num_per_round) {
+ _compaction_num_per_round *= 2;
+
g_compaction_task_num_per_round.set_value(_compaction_num_per_round);
+ }
+ } else if (thread_pool->get_queue_size() >
_compaction_num_per_round / 2) {
+ // If all tasks in the thread pool is greater than
+ // half of the tasks submitted in the previous round,
+ // reduce the number of tasks generated each time by half,
with a minimum of 1.
+ if (_compaction_num_per_round > 1) {
+ _compaction_num_per_round /= 2;
+
g_compaction_task_num_per_round.set_value(_compaction_num_per_round);
+ }
+ }
std::vector<TabletSharedPtr> tablets_compaction =
_generate_compaction_tasks(compaction_type, data_dirs,
check_score);
if (tablets_compaction.size() == 0) {
@@ -946,6 +976,8 @@ bool has_free_compaction_slot(CompactionSubmitRegistry*
registry, DataDir* dir,
std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks(
CompactionType compaction_type, std::vector<DataDir*>& data_dirs, bool
check_score) {
+
TEST_SYNC_POINT_RETURN_WITH_VALUE("olap_server::_generate_compaction_tasks.return_empty",
+ std::vector<TabletSharedPtr> {});
_update_cumulative_compaction_policy();
std::vector<TabletSharedPtr> tablets_compaction;
uint32_t max_compaction_score = 0;
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 9dd780956f5..50b82d67232 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -342,6 +342,8 @@ public:
std::unordered_map<int64_t, std::unique_ptr<TaskWorkerPoolIf>>* workers;
+ int64_t get_compaction_num_per_round() const { return
_compaction_num_per_round; }
+
private:
// Instance should be inited from `static open()`
// MUST NOT be called in other circumstances.
@@ -562,6 +564,8 @@ private:
scoped_refptr<Thread> _check_delete_bitmap_score_thread;
int64_t _last_get_peers_replica_backends_time_ms {0};
+
+ int64_t _compaction_num_per_round {1};
};
// lru cache for create tabelt round robin in disks
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 3e7d48e44af..b6b2d3b2293 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -743,6 +743,8 @@ std::vector<TabletSharedPtr>
TabletManager::find_best_tablets_to_compaction(
uint32_t single_compact_highest_score = 0;
TabletSharedPtr best_tablet;
TabletSharedPtr best_single_compact_tablet;
+ int64_t compaction_num_per_round =
+
ExecEnv::GetInstance()->storage_engine().to_local().get_compaction_num_per_round();
auto cmp = [](TabletScore left, TabletScore right) { return left.score >
right.score; };
std::priority_queue<TabletScore, std::vector<TabletScore>, decltype(cmp)>
top_tablets(cmp);
@@ -809,20 +811,18 @@ std::vector<TabletSharedPtr>
TabletManager::find_best_tablets_to_compaction(
best_single_compact_tablet = tablet_ptr;
}
- if (config::compaction_num_per_round > 1 &&
!tablet_ptr->should_fetch_from_peer()) {
+ if (compaction_num_per_round > 1 &&
!tablet_ptr->should_fetch_from_peer()) {
TabletScore ts;
ts.score = current_compaction_score;
ts.tablet_ptr = tablet_ptr;
- if ((top_tablets.size() >= config::compaction_num_per_round &&
+ if ((top_tablets.size() >= compaction_num_per_round &&
current_compaction_score > top_tablets.top().score) ||
- top_tablets.size() < config::compaction_num_per_round) {
+ top_tablets.size() < compaction_num_per_round) {
top_tablets.push(ts);
- if (top_tablets.size() > config::compaction_num_per_round) {
+ if (top_tablets.size() > compaction_num_per_round) {
top_tablets.pop();
}
- if (current_compaction_score > highest_score) {
- highest_score = current_compaction_score;
- }
+ highest_score = std::max(current_compaction_score,
highest_score);
}
} else {
if (current_compaction_score > highest_score &&
!tablet_ptr->should_fetch_from_peer()) {
diff --git a/be/test/olap/compaction_task_test.cpp
b/be/test/olap/compaction_task_test.cpp
index c00406e6d15..d70d4742362 100644
--- a/be/test/olap/compaction_task_test.cpp
+++ b/be/test/olap/compaction_task_test.cpp
@@ -130,4 +130,157 @@ TEST_F(CompactionTaskTest, TestSubmitCompactionTask) {
EXPECT_EQ(executing_task_num, 2);
}
+TEST_F(CompactionTaskTest, TestAutoSetCompactionIncreaseTaskNum) {
+ auto st = ThreadPoolBuilder("BaseCompactionTaskThreadPool")
+ .set_min_threads(2)
+ .set_max_threads(2)
+ .build(&_storage_engine->_base_compaction_thread_pool);
+ EXPECT_TRUE(st.ok());
+ st = ThreadPoolBuilder("CumuCompactionTaskThreadPool")
+ .set_min_threads(2)
+ .set_max_threads(2)
+ .build(&_storage_engine->_cumu_compaction_thread_pool);
+ EXPECT_TRUE(st.ok());
+ config::disable_auto_compaction = false;
+
+ auto* sp = SyncPoint::get_instance();
+ sp->enable_processing();
+ sp->set_call_back("olap_server::_generate_compaction_tasks.return_empty",
[](auto&& values) {
+ auto* ret = try_any_cast_ret<std::vector<TabletSharedPtr>>(values);
+ ret->second = true;
+ });
+
sp->set_call_back("StorageEngine::_adjust_compaction_thread_num.return_void",
+ [](auto&& args) { *try_any_cast<bool*>(args.back()) =
true; });
+ sp->set_call_back("StorageEngine::_compaction_tasks_producer_callback",
+ [](auto&& values) {
std::this_thread::sleep_for(std::chrono::seconds(1)); });
+
+ Defer defer {[&]() {
+ _storage_engine->_stop_background_threads_latch.count_down();
+ sp->clear_all_call_backs();
+ }};
+
+ config::generate_compaction_tasks_interval_ms = 1000;
+ {
+ // queue size 1
+ // task num 1->1
+ _storage_engine->_cumu_compaction_thread_pool->_total_queued_tasks = 1;
+ // compaction tasks producer thread
+ st = Thread::create(
+ "StorageEngine", "compaction_tasks_producer_thread",
+ [this]() {
this->_storage_engine->_compaction_tasks_producer_callback(); },
+ &_storage_engine->_compaction_tasks_producer_thread);
+ EXPECT_TRUE(st.ok());
+ _storage_engine->_stop_background_threads_latch.count_down();
+ sleep(2);
+ EXPECT_EQ(_storage_engine->get_compaction_num_per_round(), 1);
+ }
+ {
+ // queue size 0
+ // task num 4->8
+ _storage_engine->_cumu_compaction_thread_pool->_total_queued_tasks = 0;
+ _storage_engine->_compaction_num_per_round = 4;
+ // compaction tasks producer thread
+ st = Thread::create(
+ "StorageEngine", "compaction_tasks_producer_thread",
+ [this]() {
this->_storage_engine->_compaction_tasks_producer_callback(); },
+ &_storage_engine->_compaction_tasks_producer_thread);
+ EXPECT_TRUE(st.ok());
+ _storage_engine->_stop_background_threads_latch.count_down();
+ sleep(2);
+ EXPECT_EQ(_storage_engine->get_compaction_num_per_round(), 8);
+ }
+ {
+ // queue size 0
+ // task num 64->64
+ _storage_engine->_cumu_compaction_thread_pool->_total_queued_tasks = 0;
+ _storage_engine->_compaction_num_per_round = 64;
+ // compaction tasks producer thread
+ st = Thread::create(
+ "StorageEngine", "compaction_tasks_producer_thread",
+ [this]() {
this->_storage_engine->_compaction_tasks_producer_callback(); },
+ &_storage_engine->_compaction_tasks_producer_thread);
+ EXPECT_TRUE(st.ok());
+ _storage_engine->_stop_background_threads_latch.count_down();
+ sleep(2);
+ EXPECT_EQ(_storage_engine->get_compaction_num_per_round(), 64);
+ }
+}
+
+TEST_F(CompactionTaskTest, TestAutoSetCompactionDecreaseTaskNum) {
+ auto st = ThreadPoolBuilder("BaseCompactionTaskThreadPool")
+ .set_min_threads(2)
+ .set_max_threads(2)
+ .build(&_storage_engine->_base_compaction_thread_pool);
+ EXPECT_TRUE(st.ok());
+ st = ThreadPoolBuilder("CumuCompactionTaskThreadPool")
+ .set_min_threads(2)
+ .set_max_threads(2)
+ .build(&_storage_engine->_cumu_compaction_thread_pool);
+ EXPECT_TRUE(st.ok());
+ config::disable_auto_compaction = false;
+
+ auto* sp = SyncPoint::get_instance();
+ sp->enable_processing();
+ sp->set_call_back("olap_server::_generate_compaction_tasks.return_empty",
[](auto&& values) {
+ auto* ret = try_any_cast_ret<std::vector<TabletSharedPtr>>(values);
+ ret->second = true;
+ });
+
sp->set_call_back("StorageEngine::_adjust_compaction_thread_num.return_void",
+ [](auto&& args) { *try_any_cast<bool*>(args.back()) =
true; });
+ sp->set_call_back("StorageEngine::_compaction_tasks_producer_callback",
+ [](auto&& values) {
std::this_thread::sleep_for(std::chrono::seconds(1)); });
+
+ Defer defer {[&]() {
+ _storage_engine->_stop_background_threads_latch.count_down();
+ sp->clear_all_call_backs();
+ }};
+
+ config::generate_compaction_tasks_interval_ms = 1000;
+ {
+ // queue size 3
+ // task num 8->8
+ _storage_engine->_compaction_num_per_round = 8;
+ _storage_engine->_cumu_compaction_thread_pool->_total_queued_tasks = 3;
+ // compaction tasks producer thread
+ st = Thread::create(
+ "StorageEngine", "compaction_tasks_producer_thread",
+ [this]() {
this->_storage_engine->_compaction_tasks_producer_callback(); },
+ &_storage_engine->_compaction_tasks_producer_thread);
+ EXPECT_TRUE(st.ok());
+ _storage_engine->_stop_background_threads_latch.count_down();
+ sleep(2);
+ EXPECT_EQ(_storage_engine->get_compaction_num_per_round(), 8);
+ }
+ {
+ // queue size 5
+ // task num 8->4
+ _storage_engine->_cumu_compaction_thread_pool->_total_queued_tasks = 5;
+ _storage_engine->_compaction_num_per_round = 8;
+ // compaction tasks producer thread
+ st = Thread::create(
+ "StorageEngine", "compaction_tasks_producer_thread",
+ [this]() {
this->_storage_engine->_compaction_tasks_producer_callback(); },
+ &_storage_engine->_compaction_tasks_producer_thread);
+ EXPECT_TRUE(st.ok());
+ _storage_engine->_stop_background_threads_latch.count_down();
+ sleep(2);
+ EXPECT_EQ(_storage_engine->get_compaction_num_per_round(), 4);
+ }
+ {
+ // queue size 1
+ // task num 1->1
+ _storage_engine->_cumu_compaction_thread_pool->_total_queued_tasks = 1;
+ _storage_engine->_compaction_num_per_round = 1;
+ // compaction tasks producer thread
+ st = Thread::create(
+ "StorageEngine", "compaction_tasks_producer_thread",
+ [this]() {
this->_storage_engine->_compaction_tasks_producer_callback(); },
+ &_storage_engine->_compaction_tasks_producer_thread);
+ EXPECT_TRUE(st.ok());
+ _storage_engine->_stop_background_threads_latch.count_down();
+ sleep(2);
+ EXPECT_EQ(_storage_engine->get_compaction_num_per_round(), 1);
+ }
+}
+
} // namespace doris
diff --git a/be/test/olap/tablet_mgr_test.cpp b/be/test/olap/tablet_mgr_test.cpp
index 7b940ffa0fa..4096c9ed86e 100644
--- a/be/test/olap/tablet_mgr_test.cpp
+++ b/be/test/olap/tablet_mgr_test.cpp
@@ -73,7 +73,9 @@ public:
EngineOptions options;
// won't open engine, options.path is needless
options.backend_uid = UniqueId::gen_uid();
- k_engine = std::make_unique<StorageEngine>(options);
+ auto engine = std::make_unique<StorageEngine>(options);
+ ExecEnv::GetInstance()->set_storage_engine(std::move(engine));
+ k_engine = &ExecEnv::GetInstance()->storage_engine().to_local();
_data_dir = new DataDir(*k_engine, _engine_data_path, 1000000000);
static_cast<void>(_data_dir->init());
_tablet_mgr = k_engine->tablet_manager();
@@ -82,10 +84,11 @@ public:
virtual void TearDown() {
SAFE_DELETE(_data_dir);
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_engine_data_path).ok());
+ ExecEnv::GetInstance()->set_storage_engine(nullptr);
_tablet_mgr = nullptr;
config::compaction_num_per_round = 1;
}
- std::unique_ptr<StorageEngine> k_engine;
+ StorageEngine* k_engine;
private:
DataDir* _data_dir = nullptr;
@@ -470,7 +473,7 @@ TEST_F(TabletMgrTest, FindTabletWithCompact) {
}
{
- config::compaction_num_per_round = 10;
+ k_engine->_compaction_num_per_round = 10;
for (int64_t i = 1; i <= 100; ++i) {
create_tablet(10000 + i, false, i);
}
@@ -488,7 +491,7 @@ TEST_F(TabletMgrTest, FindTabletWithCompact) {
100 - index - 1);
index++;
}
- config::compaction_num_per_round = 1;
+ k_engine->_compaction_num_per_round = 1;
// drop all tablets
for (int64_t id = 10001; id <= 10100; ++id) {
Status drop_st = _tablet_mgr->drop_tablet(id, id * 10, false);
@@ -497,7 +500,7 @@ TEST_F(TabletMgrTest, FindTabletWithCompact) {
}
{
- config::compaction_num_per_round = 10;
+ k_engine->_compaction_num_per_round = 10;
for (int64_t i = 1; i <= 100; ++i) {
create_tablet(20000 + i, false, i);
}
@@ -520,7 +523,7 @@ TEST_F(TabletMgrTest, FindTabletWithCompact) {
cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY]),
200 - 1);
- config::compaction_num_per_round = 1;
+ k_engine->_compaction_num_per_round = 1;
// drop all tablets
for (int64_t id = 20001; id <= 20100; ++id) {
Status drop_st = _tablet_mgr->drop_tablet(id, id * 10, false);
@@ -532,7 +535,7 @@ TEST_F(TabletMgrTest, FindTabletWithCompact) {
}
{
- config::compaction_num_per_round = 10;
+ k_engine->_compaction_num_per_round = 10;
for (int64_t i = 1; i <= 5; ++i) {
create_tablet(30000 + i, false, i + 5);
}
@@ -549,7 +552,7 @@ TEST_F(TabletMgrTest, FindTabletWithCompact) {
10 - i - 1);
}
- config::compaction_num_per_round = 1;
+ k_engine->_compaction_num_per_round = 1;
// drop all tablets
for (int64_t id = 30001; id <= 30005; ++id) {
Status drop_st = _tablet_mgr->drop_tablet(id, id * 10, false);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]