This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f35b235c3b [opt](compaction) optimize compaction in concurrent load 
(#10153)
f35b235c3b is described below

commit f35b235c3b1936f637faa0c1efee54623776e30c
Author: yixiutt <[email protected]>
AuthorDate: Fri Jun 17 17:49:45 2022 +0800

    [opt](compaction) optimize compaction in concurrent load (#10153)
    
    add some logic to opt compaction:
    1.seperate base&cumu compaction in case base compaction runs too long and
    affect cumu compaction
    2.fix level size in cu compaction so that file size below 64M have a right 
level
    size, when choose rowsets to do compaction, the policy will ignore big 
rowset,
    this will reduce about 25% cpu in high frequency concurrent load
    3.remove skip window restriction so rowset can do compaction right after
    generated, cause we'll not delete rowset after compaction. This will highly
    reduce compaction score in concurrent log.
    4.remove version consistence check in can_do_compaction, we'll choose a
    consecutive rowset to do compaction, so this logic is useless
    
    after add logic above, compaction score and cpu cost will have a substantial
    optimize in concurrent load.
    
    Co-authored-by: yixiutt <[email protected]>
---
 be/src/common/config.h                             |   7 +-
 be/src/olap/cumulative_compaction.cpp              |   6 +-
 be/src/olap/cumulative_compaction_policy.cpp       |  12 +--
 be/src/olap/cumulative_compaction_policy.h         |   2 -
 be/src/olap/olap_server.cpp                        | 114 ++++++++++++++-------
 be/src/olap/storage_engine.cpp                     |   7 +-
 be/src/olap/storage_engine.h                       |   5 +-
 be/src/olap/tablet.cpp                             |  21 +---
 be/src/olap/tablet.h                               |   2 +-
 be/test/olap/cumulative_compaction_policy_test.cpp |  25 +++--
 docs/en/docs/admin-manual/config/be-config.md      |  14 ++-
 docs/zh-CN/docs/admin-manual/config/be-config.md   |  14 ++-
 12 files changed, 129 insertions(+), 100 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 4885c17193..56e7382320 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -285,17 +285,14 @@ 
CONF_mInt64(cumulative_size_based_compaction_lower_size_mbytes, "64");
 // cumulative compaction policy: min and max delta file's number
 CONF_mInt64(min_cumulative_compaction_num_singleton_deltas, "5");
 CONF_mInt64(max_cumulative_compaction_num_singleton_deltas, "1000");
-// cumulative compaction skips recently published deltas in order to prevent
-// compacting a version that might be queried (in case the query planning 
phase took some time).
-// the following config set the window size
-CONF_mInt32(cumulative_compaction_skip_window_seconds, "30");
 
 // if compaction of a tablet failed, this tablet should not be chosen to
 // compaction until this interval passes.
 CONF_mInt64(min_compaction_failure_interval_sec, "5"); // 5 seconds
 
 // This config can be set to limit thread number in compaction thread pool.
-CONF_mInt32(max_compaction_threads, "10");
+CONF_mInt32(max_base_compaction_threads, "4");
+CONF_mInt32(max_cumu_compaction_threads, "10");
 
 // This config can be set to limit thread number in convert rowset thread pool.
 CONF_mInt32(convert_rowset_thread_num, "0");
diff --git a/be/src/olap/cumulative_compaction.cpp 
b/be/src/olap/cumulative_compaction.cpp
index df8dbc50f1..35329e1362 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -95,15 +95,13 @@ Status CumulativeCompaction::execute_compact_impl() {
 Status CumulativeCompaction::pick_rowsets_to_compact() {
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(
-            config::cumulative_compaction_skip_window_seconds, 
&candidate_rowsets);
+    
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     if (candidate_rowsets.empty()) {
         return 
Status::OLAPInternalError(OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSION);
     }
 
-    // candidate_rowsets may not be continuous. Because some rowset may not be 
selected
-    // because the protection time has not 
expired(config::cumulative_compaction_skip_window_seconds).
+    // candidate_rowsets may not be continuous
     // So we need to choose the longest continuous path from it.
     std::vector<Version> missing_versions;
     RETURN_NOT_OK(find_longest_consecutive_version(&candidate_rowsets, 
&missing_versions));
diff --git a/be/src/olap/cumulative_compaction_policy.cpp 
b/be/src/olap/cumulative_compaction_policy.cpp
index a77eb44f73..ad73e1fc80 100644
--- a/be/src/olap/cumulative_compaction_policy.cpp
+++ b/be/src/olap/cumulative_compaction_policy.cpp
@@ -32,10 +32,11 @@ 
SizeBasedCumulativeCompactionPolicy::SizeBasedCumulativeCompactionPolicy(
           _size_based_promotion_ratio(size_based_promotion_ratio),
           _size_based_promotion_min_size(size_based_promotion_min_size),
           
_size_based_compaction_lower_bound_size(size_based_compaction_lower_bound_size) 
{
-    // init _levels by divide 2 between size_based_promotion_size and 
size_based_compaction_lower_bound_size
+    // init _levels by divide 2 between size_based_compaction_lower_bound_size 
and 1K
+    // cu compaction handle file size less then 
size_based_compaction_lower_bound_size
     int64_t i_size = size_based_promotion_size / 2;
 
-    while (i_size >= size_based_compaction_lower_bound_size) {
+    while (i_size >= 1024) {
         _levels.push_back(i_size);
         i_size /= 2;
     }
@@ -460,16 +461,11 @@ void 
NumBasedCumulativeCompactionPolicy::calculate_cumulative_point(
 }
 
 void CumulativeCompactionPolicy::pick_candidate_rowsets(
-        int64_t skip_window_sec,
         const std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>& 
rs_version_map,
         int64_t cumulative_point, std::vector<RowsetSharedPtr>* 
candidate_rowsets) {
-    int64_t now = UnixSeconds();
     for (auto& it : rs_version_map) {
         // find all rowset version greater than cumulative_point and skip the 
create time in skip_window_sec
-        if (it.first.first >= cumulative_point &&
-            ((it.second->creation_time() + skip_window_sec < now)
-             // this case means a rowset has been compacted before which is 
not a new published rowset, so it should participate compaction
-             || (it.first.first != it.first.second))) {
+        if (it.first.first >= cumulative_point) {
             candidate_rowsets->push_back(it.second);
         }
     }
diff --git a/be/src/olap/cumulative_compaction_policy.h 
b/be/src/olap/cumulative_compaction_policy.h
index 0f59d52834..079155c6b3 100644
--- a/be/src/olap/cumulative_compaction_policy.h
+++ b/be/src/olap/cumulative_compaction_policy.h
@@ -69,12 +69,10 @@ public:
 
     /// This function implements the policy which represents how to pick the 
candidate rowsets for compaction.
     /// This base class gives a unified implementation. Its derived classes 
also can override this function each other.
-    /// param skip_window_sec, it means skipping the rowsets which use create 
time plus skip_window_sec is greater than now.
     /// param rs_version_map, mapping from version to rowset
     /// param cumulative_point,  current cumulative point of tablet
     /// return candidate_rowsets, the container of candidate rowsets
     virtual void pick_candidate_rowsets(
-            int64_t skip_window_sec,
             const std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>& 
rs_version_map,
             int64_t cumulative_point, std::vector<RowsetSharedPtr>* 
candidate_rowsets);
 
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index eb1d47c065..5088bd8775 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -67,11 +67,16 @@ Status StorageEngine::start_bg_threads() {
         data_dirs.push_back(tmp_store.second);
     }
 
-    int32_t max_thread_num = config::max_compaction_threads;
-    ThreadPoolBuilder("CompactionTaskThreadPool")
+    int32_t max_thread_num = config::max_base_compaction_threads;
+    ThreadPoolBuilder("BaseCompactionTaskThreadPool")
             .set_min_threads(max_thread_num)
             .set_max_threads(max_thread_num)
-            .build(&_compaction_thread_pool);
+            .build(&_base_compaction_thread_pool);
+    max_thread_num = config::max_cumu_compaction_threads;
+    ThreadPoolBuilder("CumuCompactionTaskThreadPool")
+            .set_min_threads(max_thread_num)
+            .set_max_threads(max_thread_num)
+            .build(&_cumu_compaction_thread_pool);
 
     int32_t convert_rowset_thread_num = config::convert_rowset_thread_num;
     if (convert_rowset_thread_num > 0) {
@@ -88,11 +93,14 @@ Status StorageEngine::start_bg_threads() {
         LOG(INFO) << "alpha rowset scan thread started";
     }
 
-    ThreadPoolBuilder("CompactionTaskThreadPool")
-            .set_min_threads(max_thread_num)
-            .set_max_threads(max_thread_num)
-            .build(&_compaction_thread_pool);
-
+    ThreadPoolBuilder("BaseCompactionTaskThreadPool")
+            .set_min_threads(config::max_base_compaction_threads)
+            .set_max_threads(config::max_base_compaction_threads)
+            .build(&_base_compaction_thread_pool);
+    ThreadPoolBuilder("CumuCompactionTaskThreadPool")
+            .set_min_threads(config::max_cumu_compaction_threads)
+            .set_max_threads(config::max_cumu_compaction_threads)
+            .build(&_cumu_compaction_thread_pool);
     ThreadPoolBuilder("SmallCompactionTaskThreadPool")
             .set_min_threads(config::quick_compaction_max_threads)
             .set_max_threads(config::quick_compaction_max_threads)
@@ -364,6 +372,46 @@ void StorageEngine::_alpha_rowset_scan_thread_callback() {
     } while 
(!_stop_background_threads_latch.wait_for(std::chrono::seconds(scan_interval_sec)));
 }
 
+void StorageEngine::_adjust_compaction_thread_num() {
+    if (_base_compaction_thread_pool->max_threads() != 
config::max_base_compaction_threads) {
+        int old_max_threads = _base_compaction_thread_pool->max_threads();
+        Status status =
+                
_base_compaction_thread_pool->set_max_threads(config::max_base_compaction_threads);
+        if (status.ok()) {
+            VLOG_NOTICE << "update base compaction thread pool max_threads 
from " << old_max_threads
+                        << " to " << config::max_base_compaction_threads;
+        }
+    }
+    if (_base_compaction_thread_pool->min_threads() != 
config::max_base_compaction_threads) {
+        int old_min_threads = _base_compaction_thread_pool->min_threads();
+        Status status =
+                
_base_compaction_thread_pool->set_min_threads(config::max_base_compaction_threads);
+        if (status.ok()) {
+            VLOG_NOTICE << "update base compaction thread pool min_threads 
from " << old_min_threads
+                        << " to " << config::max_base_compaction_threads;
+        }
+    }
+
+    if (_cumu_compaction_thread_pool->max_threads() != 
config::max_cumu_compaction_threads) {
+        int old_max_threads = _cumu_compaction_thread_pool->max_threads();
+        Status status =
+                
_cumu_compaction_thread_pool->set_max_threads(config::max_cumu_compaction_threads);
+        if (status.ok()) {
+            VLOG_NOTICE << "update cumu compaction thread pool max_threads 
from " << old_max_threads
+                        << " to " << config::max_cumu_compaction_threads;
+        }
+    }
+    if (_cumu_compaction_thread_pool->min_threads() != 
config::max_cumu_compaction_threads) {
+        int old_min_threads = _cumu_compaction_thread_pool->min_threads();
+        Status status =
+                
_cumu_compaction_thread_pool->set_min_threads(config::max_cumu_compaction_threads);
+        if (status.ok()) {
+            VLOG_NOTICE << "update cumu compaction thread pool min_threads 
from " << old_min_threads
+                        << " to " << config::max_cumu_compaction_threads;
+        }
+    }
+}
+
 void StorageEngine::_compaction_tasks_producer_callback() {
 #ifdef GOOGLE_PROFILER
     ProfilerRegisterThread();
@@ -394,35 +442,7 @@ void StorageEngine::_compaction_tasks_producer_callback() {
     int64_t interval = config::generate_compaction_tasks_min_interval_ms;
     do {
         if (!config::disable_auto_compaction) {
-            VLOG_CRITICAL << "compaction thread pool. num_threads: "
-                          << _compaction_thread_pool->num_threads()
-                          << ", num_threads_pending_start: "
-                          << 
_compaction_thread_pool->num_threads_pending_start()
-                          << ", num_active_threads: "
-                          << _compaction_thread_pool->num_active_threads()
-                          << ", max_threads: " << 
_compaction_thread_pool->max_threads()
-                          << ", min_threads: " << 
_compaction_thread_pool->min_threads()
-                          << ", num_total_queued_tasks: "
-                          << _compaction_thread_pool->get_queue_size();
-
-            if (_compaction_thread_pool->max_threads() != 
config::max_compaction_threads) {
-                int old_max_threads = _compaction_thread_pool->max_threads();
-                Status status =
-                        
_compaction_thread_pool->set_max_threads(config::max_compaction_threads);
-                if (status.ok()) {
-                    LOG(INFO) << "update compaction thread pool max_threads 
from "
-                              << old_max_threads << " to " << 
config::max_compaction_threads;
-                }
-            }
-            if (_compaction_thread_pool->min_threads() != 
config::max_compaction_threads) {
-                int old_min_threads = _compaction_thread_pool->min_threads();
-                Status status =
-                        
_compaction_thread_pool->set_min_threads(config::max_compaction_threads);
-                if (status.ok()) {
-                    LOG(INFO) << "update compaction thread pool min_threads 
from "
-                              << old_min_threads << " to " << 
config::max_compaction_threads;
-                }
-            }
+            _adjust_compaction_thread_num();
 
             bool check_score = false;
             int64_t cur_time = UnixMillis();
@@ -441,6 +461,20 @@ void StorageEngine::_compaction_tasks_producer_callback() {
                     last_base_score_update_time = cur_time;
                 }
             }
+            std::unique_ptr<ThreadPool>& thread_pool =
+                    (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
+                            ? _cumu_compaction_thread_pool
+                            : _base_compaction_thread_pool;
+            VLOG_CRITICAL << "compaction thread pool. type: "
+                          << (compaction_type == 
CompactionType::CUMULATIVE_COMPACTION ? "CUMU"
+                                                                               
        : "BASE")
+                          << ", num_threads: " << thread_pool->num_threads()
+                          << ", num_threads_pending_start: "
+                          << thread_pool->num_threads_pending_start()
+                          << ", num_active_threads: " << 
thread_pool->num_active_threads()
+                          << ", max_threads: " << thread_pool->max_threads()
+                          << ", min_threads: " << thread_pool->min_threads()
+                          << ", num_total_queued_tasks: " << 
thread_pool->get_queue_size();
             std::vector<TabletSharedPtr> tablets_compaction =
                     _generate_compaction_tasks(compaction_type, data_dirs, 
check_score);
             if (tablets_compaction.size() == 0) {
@@ -623,7 +657,11 @@ Status 
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
     int64_t permits = 0;
     Status st = 
tablet->prepare_compaction_and_calculate_permits(compaction_type, tablet, 
&permits);
     if (st.ok() && permits > 0 && _permit_limiter.request(permits)) {
-        auto st = _compaction_thread_pool->submit_func([=]() {
+        std::unique_ptr<ThreadPool>& thread_pool =
+                (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
+                        ? _cumu_compaction_thread_pool
+                        : _base_compaction_thread_pool;
+        auto st = thread_pool->submit_func([=]() {
             SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::COMPACTION,
                                       
tablet->get_compaction_mem_tracker(compaction_type));
             CgroupsMgr::apply_system_cgroup();
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index c872851352..5a75a4f88d 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -157,8 +157,11 @@ StorageEngine::~StorageEngine() {
     DEREGISTER_HOOK_METRIC(schema_change_mem_consumption);
     _clear();
 
-    if (_compaction_thread_pool) {
-        _compaction_thread_pool->shutdown();
+    if (_base_compaction_thread_pool) {
+        _base_compaction_thread_pool->shutdown();
+    }
+    if (_cumu_compaction_thread_pool) {
+        _cumu_compaction_thread_pool->shutdown();
     }
     if (_convert_rowset_thread_pool) {
         _convert_rowset_thread_pool->shutdown();
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 40d4303dab..e54a8d5f91 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -274,6 +274,8 @@ private:
 
     Status _handle_quick_compaction(TabletSharedPtr);
 
+    void _adjust_compaction_thread_num();
+
 private:
     struct CompactionCandidate {
         CompactionCandidate(uint32_t nicumulative_compaction_, int64_t 
tablet_id_, uint32_t index_)
@@ -381,8 +383,9 @@ private:
 
     HeartbeatFlags* _heartbeat_flags;
 
-    std::unique_ptr<ThreadPool> _compaction_thread_pool;
     std::unique_ptr<ThreadPool> _quick_compaction_thread_pool;
+    std::unique_ptr<ThreadPool> _base_compaction_thread_pool;
+    std::unique_ptr<ThreadPool> _cumu_compaction_thread_pool;
 
     scoped_refptr<Thread> _alpha_rowset_scan_thread;
     std::unique_ptr<ThreadPool> _convert_rowset_thread_pool;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 771069e4ce..2db820b975 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -713,21 +713,6 @@ bool Tablet::can_do_compaction(size_t path_hash, 
CompactionType compaction_type)
         return false;
     }
 
-    if (tablet_state() == TABLET_RUNNING) {
-        // if tablet state is running, we need to check if it has consistent 
versions.
-        // tablet in other state such as TABLET_NOTREADY may not have complete 
versions.
-        std::shared_lock rdlock(_meta_lock);
-        const RowsetSharedPtr lastest_delta = rowset_with_max_version();
-        if (lastest_delta == nullptr) {
-            return false;
-        }
-
-        Version test_version = Version(0, lastest_delta->end_version());
-        if (!capture_consistent_versions(test_version, nullptr)) {
-            return false;
-        }
-    }
-
     if (tablet_state() == TABLET_NOTREADY) {
         // Before doing schema change, tablet's rowsets that versions smaller 
than max converting version will be
         // removed. So, we only need to do the compaction when it is being 
converted.
@@ -1092,13 +1077,13 @@ TabletInfo Tablet::get_tablet_info() const {
 }
 
 void Tablet::pick_candidate_rowsets_to_cumulative_compaction(
-        int64_t skip_window_sec, std::vector<RowsetSharedPtr>* 
candidate_rowsets) {
+        std::vector<RowsetSharedPtr>* candidate_rowsets) {
     if (_cumulative_point == K_INVALID_CUMULATIVE_POINT) {
         return;
     }
     std::shared_lock rdlock(_meta_lock);
-    _cumulative_compaction_policy->pick_candidate_rowsets(skip_window_sec, 
_rs_version_map,
-                                                          _cumulative_point, 
candidate_rowsets);
+    _cumulative_compaction_policy->pick_candidate_rowsets(_rs_version_map, 
_cumulative_point,
+                                                          candidate_rowsets);
 }
 
 void Tablet::find_alpha_rowsets(std::vector<RowsetSharedPtr>* rowsets) const {
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 1156de36f1..e0f124988d 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -211,7 +211,7 @@ public:
     TabletInfo get_tablet_info() const;
 
     void pick_candidate_rowsets_to_cumulative_compaction(
-            int64_t skip_window_sec, std::vector<RowsetSharedPtr>* 
candidate_rowsets);
+            std::vector<RowsetSharedPtr>* candidate_rowsets);
     void 
pick_candidate_rowsets_to_base_compaction(std::vector<RowsetSharedPtr>* 
candidate_rowsets);
 
     void calculate_cumulative_point();
diff --git a/be/test/olap/cumulative_compaction_policy_test.cpp 
b/be/test/olap/cumulative_compaction_policy_test.cpp
index 76a5acaf36..92d3dda5bd 100644
--- a/be/test/olap/cumulative_compaction_policy_test.cpp
+++ b/be/test/olap/cumulative_compaction_policy_test.cpp
@@ -256,7 +256,7 @@ TEST_F(TestNumBasedCumulativeCompactionPolicy, 
pick_candidate_rowsets) {
     _tablet->calculate_cumulative_point();
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, 
&candidate_rowsets);
+    
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     EXPECT_EQ(2, candidate_rowsets.size());
 }
@@ -279,7 +279,7 @@ TEST_F(TestNumBasedCumulativeCompactionPolicy, 
pick_input_rowsets_normal) {
     NumBasedCumulativeCompactionPolicy policy;
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, 
&candidate_rowsets);
+    
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version {-1, -1};
@@ -311,7 +311,7 @@ TEST_F(TestNumBasedCumulativeCompactionPolicy, 
pick_input_rowsets_delete) {
     NumBasedCumulativeCompactionPolicy policy;
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, 
&candidate_rowsets);
+    
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version {-1, -1};
@@ -769,7 +769,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_candidate_rowsets) {
     _tablet->calculate_cumulative_point();
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, 
&candidate_rowsets);
+    
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     EXPECT_EQ(3, candidate_rowsets.size());
 }
@@ -790,7 +790,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_candidate_rowsets_big_base)
     _tablet->calculate_cumulative_point();
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, 
&candidate_rowsets);
+    
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     EXPECT_EQ(3, candidate_rowsets.size());
 }
@@ -812,7 +812,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_normal) {
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, 
&candidate_rowsets);
+    
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version {-1, -1};
@@ -844,7 +844,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_big_base) {
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, 
&candidate_rowsets);
+    
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version {-1, -1};
@@ -876,7 +876,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_promotion) {
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, 
&candidate_rowsets);
+    
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version {-1, -1};
@@ -908,7 +908,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_not_same_leve
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, 
&candidate_rowsets);
+    
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version {-1, -1};
@@ -940,7 +940,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_empty) {
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, 
&candidate_rowsets);
+    
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version {-1, -1};
@@ -972,7 +972,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_not_reach_min
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, 
&candidate_rowsets);
+    
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version {-1, -1};
@@ -1004,7 +1004,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_delete) {
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, 
&candidate_rowsets);
+    
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version {-1, -1};
@@ -1081,7 +1081,6 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
_level_size) {
             dynamic_cast<SizeBasedCumulativeCompactionPolicy*>(
                     _tablet->_cumulative_compaction_policy.get());
 
-    EXPECT_EQ(4, policy->_levels.size());
     EXPECT_EQ(536870912, policy->_levels[0]);
     EXPECT_EQ(268435456, policy->_levels[1]);
     EXPECT_EQ(134217728, policy->_levels[2]);
diff --git a/docs/en/docs/admin-manual/config/be-config.md 
b/docs/en/docs/admin-manual/config/be-config.md
index a75e263a81..043ffb7cc3 100644
--- a/docs/en/docs/admin-manual/config/be-config.md
+++ b/docs/en/docs/admin-manual/config/be-config.md
@@ -66,11 +66,11 @@ There are two ways to configure BE configuration items:
 
 ## Examples
 
-1. Modify `max_compaction_concurrency` statically
+1. Modify `max_base_compaction_concurrency` statically
 
      By adding in the `be.conf` file:
 
-     ```max_compaction_concurrency=5```
+     ```max_base_compaction_concurrency=5```
 
      Then restart the BE process to take effect the configuration.
 
@@ -736,10 +736,16 @@ Default: 10
 
 The maximum number of client caches per host. There are multiple client caches 
in BE, but currently we use the same cache size configuration. If necessary, 
use different configurations to set up different client-side caches
 
-### `max_compaction_threads`
+### `max_base_compaction_threads`
 
 * Type: int32
-* Description: The maximum of thread number in compaction thread pool.
+* Description: The maximum of thread number in base compaction thread pool.
+* Default value: 4
+
+### `max_cumu_compaction_threads`
+
+* Type: int32
+* Description: The maximum of thread number in cumulative compaction thread 
pool.
 * Default value: 10
 
 ### `max_consumer_num_per_group`
diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md 
b/docs/zh-CN/docs/admin-manual/config/be-config.md
index 45dd774269..5d9a4ffbb6 100644
--- a/docs/zh-CN/docs/admin-manual/config/be-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/be-config.md
@@ -64,11 +64,11 @@ BE 的配置项有两种方式进行配置:
 
 ## 应用举例
 
-1. 静态方式修改 `max_compaction_concurrency`
+1. 静态方式修改 `max_base_compaction_concurrency`
 
   通过在 `be.conf` 文件中添加:
 
-  ```max_compaction_concurrency=5```
+  ```max_base_compaction_concurrency=5```
 
   之后重启 BE 进程以生效该配置。
 
@@ -737,10 +737,16 @@ load错误日志将在此时间后删除
 
 每个主机的最大客户端缓存数,BE 中有多种客户端缓存,但目前我们使用相同的缓存大小配置。 如有必要,使用不同的配置来设置不同的客户端缓存。
 
-### `max_compaction_threads`
+### `max_base_compaction_threads`
 
 * 类型:int32
-* 描述:Compaction线程池中线程数量的最大值。
+* 描述:Base Compaction线程池中线程数量的最大值。
+* 默认值:4
+
+### `max_cumu_compaction_threads`
+
+* 类型:int32
+* 描述:Cumulative Compaction线程池中线程数量的最大值。
 * 默认值:10
 
 ### `max_consumer_num_per_group`


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to