This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 5e222ecc27a branch-3.1: [Enhancement](Compaction) Support auto set 
compaction task num per round #53408 (#53775)
5e222ecc27a is described below

commit 5e222ecc27ae615872738e1a79b4c23990c701ab
Author: abmdocrt <[email protected]>
AuthorDate: Fri Aug 1 10:10:09 2025 +0800

    branch-3.1: [Enhancement](Compaction) Support auto set compaction task num 
per round #53408 (#53775)
    
    Pick #53408
---
 be/src/common/config.cpp              |   7 +-
 be/src/common/config.h                |   1 +
 be/src/olap/olap_server.cpp           |  32 +++++++
 be/src/olap/storage_engine.h          |   4 +
 be/src/olap/tablet_manager.cpp        |  14 ++--
 be/test/olap/compaction_task_test.cpp | 153 ++++++++++++++++++++++++++++++++++
 be/test/olap/tablet_mgr_test.cpp      |  19 +++--
 7 files changed, 214 insertions(+), 16 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index b3f56b285db..252600723e5 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1508,7 +1508,12 @@ DEFINE_Bool(enable_table_size_correctness_check, 
"false");
 DEFINE_Bool(force_regenerate_rowsetid_on_start_error, "false");
 DEFINE_mBool(enable_sleep_between_delete_cumu_compaction, "false");
 
-DEFINE_mInt32(compaction_num_per_round, "4");
+// The number of compaction tasks generated each time.
+// -1 means automatic number, other values mean fixed number.
+DEFINE_mInt32(compaction_num_per_round, "-1");
+// Max automatic compaction task generated num per round.
+// Only valid if "compaction_num_per_round = 0"
+DEFINE_mInt32(max_automatic_compaction_num_per_round, "64");
 
 DEFINE_mInt32(check_tablet_delete_bitmap_interval_seconds, "300");
 DEFINE_mInt32(check_tablet_delete_bitmap_score_top_n, "10");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 1ee4b9d6821..2a9ba7bd293 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1582,6 +1582,7 @@ DECLARE_Bool(enable_table_size_correctness_check);
 DECLARE_mBool(enable_sleep_between_delete_cumu_compaction);
 
 DECLARE_mInt32(compaction_num_per_round);
+DECLARE_mInt32(max_automatic_compaction_num_per_round);
 
 DECLARE_mInt32(check_tablet_delete_bitmap_interval_seconds);
 DECLARE_mInt32(check_tablet_delete_bitmap_score_top_n);
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index b4f56216c67..f3aca7f09d2 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -98,6 +98,8 @@ using io::Path;
 
 // number of running SCHEMA-CHANGE threads
 volatile uint32_t g_schema_change_active_threads = 0;
+bvar::Status<int64_t> 
g_cumu_compaction_task_num_per_round("cumu_compaction_task_num_per_round", 0);
+bvar::Status<int64_t> 
g_base_compaction_task_num_per_round("base_compaction_task_num_per_round", 0);
 
 static const uint64_t DEFAULT_SEED = 104729;
 static const uint64_t MOD_PRIME = 7652413;
@@ -584,6 +586,7 @@ void StorageEngine::_tablet_path_check_callback() {
 }
 
 void StorageEngine::_adjust_compaction_thread_num() {
+    
TEST_SYNC_POINT_RETURN_WITH_VOID("StorageEngine::_adjust_compaction_thread_num.return_void");
     auto base_compaction_threads_num = 
get_base_compaction_threads_num(_store_map.size());
     if (_base_compaction_thread_pool->max_threads() != 
base_compaction_threads_num) {
         int old_max_threads = _base_compaction_thread_pool->max_threads();
@@ -687,6 +690,33 @@ void StorageEngine::_compaction_tasks_producer_callback() {
                     last_base_score_update_time = cur_time;
                 }
             }
+            std::unique_ptr<ThreadPool>& thread_pool =
+                    (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
+                            ? _cumu_compaction_thread_pool
+                            : _base_compaction_thread_pool;
+            bvar::Status<int64_t>& g_compaction_task_num_per_round =
+                    (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
+                            ? g_cumu_compaction_task_num_per_round
+                            : g_base_compaction_task_num_per_round;
+            if (config::compaction_num_per_round != -1) {
+                _compaction_num_per_round = config::compaction_num_per_round;
+            } else if (thread_pool->get_queue_size() == 0) {
+                // If all tasks in the thread pool queue are executed,
+                // double the number of tasks generated each time,
+                // with a maximum of 
config::max_automatic_compaction_num_per_round tasks per generation.
+                if (_compaction_num_per_round < 
config::max_automatic_compaction_num_per_round) {
+                    _compaction_num_per_round *= 2;
+                    
g_compaction_task_num_per_round.set_value(_compaction_num_per_round);
+                }
+            } else if (thread_pool->get_queue_size() > 
_compaction_num_per_round / 2) {
+                // If all tasks in the thread pool is greater than
+                // half of the tasks submitted in the previous round,
+                // reduce the number of tasks generated each time by half, 
with a minimum of 1.
+                if (_compaction_num_per_round > 1) {
+                    _compaction_num_per_round /= 2;
+                    
g_compaction_task_num_per_round.set_value(_compaction_num_per_round);
+                }
+            }
             std::vector<TabletSharedPtr> tablets_compaction =
                     _generate_compaction_tasks(compaction_type, data_dirs, 
check_score);
             if (tablets_compaction.size() == 0) {
@@ -946,6 +976,8 @@ bool has_free_compaction_slot(CompactionSubmitRegistry* 
registry, DataDir* dir,
 
 std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks(
         CompactionType compaction_type, std::vector<DataDir*>& data_dirs, bool 
check_score) {
+    
TEST_SYNC_POINT_RETURN_WITH_VALUE("olap_server::_generate_compaction_tasks.return_empty",
+                                      std::vector<TabletSharedPtr> {});
     _update_cumulative_compaction_policy();
     std::vector<TabletSharedPtr> tablets_compaction;
     uint32_t max_compaction_score = 0;
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index c5b9b193162..ebc6a2ede3d 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -350,6 +350,8 @@ public:
 
     std::unordered_map<int64_t, std::unique_ptr<TaskWorkerPoolIf>>* workers;
 
+    int64_t get_compaction_num_per_round() const { return 
_compaction_num_per_round; }
+
 private:
     // Instance should be inited from `static open()`
     // MUST NOT be called in other circumstances.
@@ -573,6 +575,8 @@ private:
     scoped_refptr<Thread> _check_delete_bitmap_score_thread;
 
     int64_t _last_get_peers_replica_backends_time_ms {0};
+
+    int64_t _compaction_num_per_round {1};
 };
 
 // lru cache for create tabelt round robin in disks
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 9251791d601..7e40b0e4cbe 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -743,6 +743,8 @@ std::vector<TabletSharedPtr> 
TabletManager::find_best_tablets_to_compaction(
     uint32_t single_compact_highest_score = 0;
     TabletSharedPtr best_tablet;
     TabletSharedPtr best_single_compact_tablet;
+    int64_t compaction_num_per_round =
+            
ExecEnv::GetInstance()->storage_engine().to_local().get_compaction_num_per_round();
     auto cmp = [](TabletScore left, TabletScore right) { return left.score > 
right.score; };
     std::priority_queue<TabletScore, std::vector<TabletScore>, decltype(cmp)> 
top_tablets(cmp);
 
@@ -809,20 +811,18 @@ std::vector<TabletSharedPtr> 
TabletManager::find_best_tablets_to_compaction(
             best_single_compact_tablet = tablet_ptr;
         }
 
-        if (config::compaction_num_per_round > 1 && 
!tablet_ptr->should_fetch_from_peer()) {
+        if (compaction_num_per_round > 1 && 
!tablet_ptr->should_fetch_from_peer()) {
             TabletScore ts;
             ts.score = current_compaction_score;
             ts.tablet_ptr = tablet_ptr;
-            if ((top_tablets.size() >= config::compaction_num_per_round &&
+            if ((top_tablets.size() >= compaction_num_per_round &&
                  current_compaction_score > top_tablets.top().score) ||
-                top_tablets.size() < config::compaction_num_per_round) {
+                top_tablets.size() < compaction_num_per_round) {
                 top_tablets.push(ts);
-                if (top_tablets.size() > config::compaction_num_per_round) {
+                if (top_tablets.size() > compaction_num_per_round) {
                     top_tablets.pop();
                 }
-                if (current_compaction_score > highest_score) {
-                    highest_score = current_compaction_score;
-                }
+                highest_score = std::max(current_compaction_score, 
highest_score);
             }
         } else {
             if (current_compaction_score > highest_score && 
!tablet_ptr->should_fetch_from_peer()) {
diff --git a/be/test/olap/compaction_task_test.cpp 
b/be/test/olap/compaction_task_test.cpp
index c00406e6d15..d70d4742362 100644
--- a/be/test/olap/compaction_task_test.cpp
+++ b/be/test/olap/compaction_task_test.cpp
@@ -130,4 +130,157 @@ TEST_F(CompactionTaskTest, TestSubmitCompactionTask) {
     EXPECT_EQ(executing_task_num, 2);
 }
 
+TEST_F(CompactionTaskTest, TestAutoSetCompactionIncreaseTaskNum) {
+    auto st = ThreadPoolBuilder("BaseCompactionTaskThreadPool")
+                      .set_min_threads(2)
+                      .set_max_threads(2)
+                      .build(&_storage_engine->_base_compaction_thread_pool);
+    EXPECT_TRUE(st.ok());
+    st = ThreadPoolBuilder("CumuCompactionTaskThreadPool")
+                 .set_min_threads(2)
+                 .set_max_threads(2)
+                 .build(&_storage_engine->_cumu_compaction_thread_pool);
+    EXPECT_TRUE(st.ok());
+    config::disable_auto_compaction = false;
+
+    auto* sp = SyncPoint::get_instance();
+    sp->enable_processing();
+    sp->set_call_back("olap_server::_generate_compaction_tasks.return_empty", 
[](auto&& values) {
+        auto* ret = try_any_cast_ret<std::vector<TabletSharedPtr>>(values);
+        ret->second = true;
+    });
+    
sp->set_call_back("StorageEngine::_adjust_compaction_thread_num.return_void",
+                      [](auto&& args) { *try_any_cast<bool*>(args.back()) = 
true; });
+    sp->set_call_back("StorageEngine::_compaction_tasks_producer_callback",
+                      [](auto&& values) { 
std::this_thread::sleep_for(std::chrono::seconds(1)); });
+
+    Defer defer {[&]() {
+        _storage_engine->_stop_background_threads_latch.count_down();
+        sp->clear_all_call_backs();
+    }};
+
+    config::generate_compaction_tasks_interval_ms = 1000;
+    {
+        // queue size 1
+        // task num 1->1
+        _storage_engine->_cumu_compaction_thread_pool->_total_queued_tasks = 1;
+        // compaction tasks producer thread
+        st = Thread::create(
+                "StorageEngine", "compaction_tasks_producer_thread",
+                [this]() { 
this->_storage_engine->_compaction_tasks_producer_callback(); },
+                &_storage_engine->_compaction_tasks_producer_thread);
+        EXPECT_TRUE(st.ok());
+        _storage_engine->_stop_background_threads_latch.count_down();
+        sleep(2);
+        EXPECT_EQ(_storage_engine->get_compaction_num_per_round(), 1);
+    }
+    {
+        // queue size 0
+        // task num 4->8
+        _storage_engine->_cumu_compaction_thread_pool->_total_queued_tasks = 0;
+        _storage_engine->_compaction_num_per_round = 4;
+        // compaction tasks producer thread
+        st = Thread::create(
+                "StorageEngine", "compaction_tasks_producer_thread",
+                [this]() { 
this->_storage_engine->_compaction_tasks_producer_callback(); },
+                &_storage_engine->_compaction_tasks_producer_thread);
+        EXPECT_TRUE(st.ok());
+        _storage_engine->_stop_background_threads_latch.count_down();
+        sleep(2);
+        EXPECT_EQ(_storage_engine->get_compaction_num_per_round(), 8);
+    }
+    {
+        // queue size 0
+        // task num 64->64
+        _storage_engine->_cumu_compaction_thread_pool->_total_queued_tasks = 0;
+        _storage_engine->_compaction_num_per_round = 64;
+        // compaction tasks producer thread
+        st = Thread::create(
+                "StorageEngine", "compaction_tasks_producer_thread",
+                [this]() { 
this->_storage_engine->_compaction_tasks_producer_callback(); },
+                &_storage_engine->_compaction_tasks_producer_thread);
+        EXPECT_TRUE(st.ok());
+        _storage_engine->_stop_background_threads_latch.count_down();
+        sleep(2);
+        EXPECT_EQ(_storage_engine->get_compaction_num_per_round(), 64);
+    }
+}
+
+TEST_F(CompactionTaskTest, TestAutoSetCompactionDecreaseTaskNum) {
+    auto st = ThreadPoolBuilder("BaseCompactionTaskThreadPool")
+                      .set_min_threads(2)
+                      .set_max_threads(2)
+                      .build(&_storage_engine->_base_compaction_thread_pool);
+    EXPECT_TRUE(st.ok());
+    st = ThreadPoolBuilder("CumuCompactionTaskThreadPool")
+                 .set_min_threads(2)
+                 .set_max_threads(2)
+                 .build(&_storage_engine->_cumu_compaction_thread_pool);
+    EXPECT_TRUE(st.ok());
+    config::disable_auto_compaction = false;
+
+    auto* sp = SyncPoint::get_instance();
+    sp->enable_processing();
+    sp->set_call_back("olap_server::_generate_compaction_tasks.return_empty", 
[](auto&& values) {
+        auto* ret = try_any_cast_ret<std::vector<TabletSharedPtr>>(values);
+        ret->second = true;
+    });
+    
sp->set_call_back("StorageEngine::_adjust_compaction_thread_num.return_void",
+                      [](auto&& args) { *try_any_cast<bool*>(args.back()) = 
true; });
+    sp->set_call_back("StorageEngine::_compaction_tasks_producer_callback",
+                      [](auto&& values) { 
std::this_thread::sleep_for(std::chrono::seconds(1)); });
+
+    Defer defer {[&]() {
+        _storage_engine->_stop_background_threads_latch.count_down();
+        sp->clear_all_call_backs();
+    }};
+
+    config::generate_compaction_tasks_interval_ms = 1000;
+    {
+        // queue size 3
+        // task num 8->8
+        _storage_engine->_compaction_num_per_round = 8;
+        _storage_engine->_cumu_compaction_thread_pool->_total_queued_tasks = 3;
+        // compaction tasks producer thread
+        st = Thread::create(
+                "StorageEngine", "compaction_tasks_producer_thread",
+                [this]() { 
this->_storage_engine->_compaction_tasks_producer_callback(); },
+                &_storage_engine->_compaction_tasks_producer_thread);
+        EXPECT_TRUE(st.ok());
+        _storage_engine->_stop_background_threads_latch.count_down();
+        sleep(2);
+        EXPECT_EQ(_storage_engine->get_compaction_num_per_round(), 8);
+    }
+    {
+        // queue size 5
+        // task num 8->4
+        _storage_engine->_cumu_compaction_thread_pool->_total_queued_tasks = 5;
+        _storage_engine->_compaction_num_per_round = 8;
+        // compaction tasks producer thread
+        st = Thread::create(
+                "StorageEngine", "compaction_tasks_producer_thread",
+                [this]() { 
this->_storage_engine->_compaction_tasks_producer_callback(); },
+                &_storage_engine->_compaction_tasks_producer_thread);
+        EXPECT_TRUE(st.ok());
+        _storage_engine->_stop_background_threads_latch.count_down();
+        sleep(2);
+        EXPECT_EQ(_storage_engine->get_compaction_num_per_round(), 4);
+    }
+    {
+        // queue size 1
+        // task num 1->1
+        _storage_engine->_cumu_compaction_thread_pool->_total_queued_tasks = 1;
+        _storage_engine->_compaction_num_per_round = 1;
+        // compaction tasks producer thread
+        st = Thread::create(
+                "StorageEngine", "compaction_tasks_producer_thread",
+                [this]() { 
this->_storage_engine->_compaction_tasks_producer_callback(); },
+                &_storage_engine->_compaction_tasks_producer_thread);
+        EXPECT_TRUE(st.ok());
+        _storage_engine->_stop_background_threads_latch.count_down();
+        sleep(2);
+        EXPECT_EQ(_storage_engine->get_compaction_num_per_round(), 1);
+    }
+}
+
 } // namespace doris
diff --git a/be/test/olap/tablet_mgr_test.cpp b/be/test/olap/tablet_mgr_test.cpp
index 7b940ffa0fa..4096c9ed86e 100644
--- a/be/test/olap/tablet_mgr_test.cpp
+++ b/be/test/olap/tablet_mgr_test.cpp
@@ -73,7 +73,9 @@ public:
         EngineOptions options;
         // won't open engine, options.path is needless
         options.backend_uid = UniqueId::gen_uid();
-        k_engine = std::make_unique<StorageEngine>(options);
+        auto engine = std::make_unique<StorageEngine>(options);
+        ExecEnv::GetInstance()->set_storage_engine(std::move(engine));
+        k_engine = &ExecEnv::GetInstance()->storage_engine().to_local();
         _data_dir = new DataDir(*k_engine, _engine_data_path, 1000000000);
         static_cast<void>(_data_dir->init());
         _tablet_mgr = k_engine->tablet_manager();
@@ -82,10 +84,11 @@ public:
     virtual void TearDown() {
         SAFE_DELETE(_data_dir);
         
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_engine_data_path).ok());
+        ExecEnv::GetInstance()->set_storage_engine(nullptr);
         _tablet_mgr = nullptr;
         config::compaction_num_per_round = 1;
     }
-    std::unique_ptr<StorageEngine> k_engine;
+    StorageEngine* k_engine;
 
 private:
     DataDir* _data_dir = nullptr;
@@ -470,7 +473,7 @@ TEST_F(TabletMgrTest, FindTabletWithCompact) {
     }
 
     {
-        config::compaction_num_per_round = 10;
+        k_engine->_compaction_num_per_round = 10;
         for (int64_t i = 1; i <= 100; ++i) {
             create_tablet(10000 + i, false, i);
         }
@@ -488,7 +491,7 @@ TEST_F(TabletMgrTest, FindTabletWithCompact) {
                       100 - index - 1);
             index++;
         }
-        config::compaction_num_per_round = 1;
+        k_engine->_compaction_num_per_round = 1;
         // drop all tablets
         for (int64_t id = 10001; id <= 10100; ++id) {
             Status drop_st = _tablet_mgr->drop_tablet(id, id * 10, false);
@@ -497,7 +500,7 @@ TEST_F(TabletMgrTest, FindTabletWithCompact) {
     }
 
     {
-        config::compaction_num_per_round = 10;
+        k_engine->_compaction_num_per_round = 10;
         for (int64_t i = 1; i <= 100; ++i) {
             create_tablet(20000 + i, false, i);
         }
@@ -520,7 +523,7 @@ TEST_F(TabletMgrTest, FindTabletWithCompact) {
                           
cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY]),
                   200 - 1);
 
-        config::compaction_num_per_round = 1;
+        k_engine->_compaction_num_per_round = 1;
         // drop all tablets
         for (int64_t id = 20001; id <= 20100; ++id) {
             Status drop_st = _tablet_mgr->drop_tablet(id, id * 10, false);
@@ -532,7 +535,7 @@ TEST_F(TabletMgrTest, FindTabletWithCompact) {
     }
 
     {
-        config::compaction_num_per_round = 10;
+        k_engine->_compaction_num_per_round = 10;
         for (int64_t i = 1; i <= 5; ++i) {
             create_tablet(30000 + i, false, i + 5);
         }
@@ -549,7 +552,7 @@ TEST_F(TabletMgrTest, FindTabletWithCompact) {
                       10 - i - 1);
         }
 
-        config::compaction_num_per_round = 1;
+        k_engine->_compaction_num_per_round = 1;
         // drop all tablets
         for (int64_t id = 30001; id <= 30005; ++id) {
             Status drop_st = _tablet_mgr->drop_tablet(id, id * 10, false);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to