github-actions[bot] commented on code in PR #31215:
URL: https://github.com/apache/doris/pull/31215#discussion_r1497123801


##########
be/src/cloud/cloud_cumulative_compaction_policy.cpp:
##########
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_cumulative_compaction_policy.h"
+
+#include <algorithm>
+#include <list>
+#include <ostream>
+#include <string>
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "common/sync_point.h"
+#include "cloud/config.h"
+#include "olap/olap_common.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
+
+namespace doris {
+
+CloudSizeBasedCumulativeCompactionPolicy::CloudSizeBasedCumulativeCompactionPolicy(
+        int64_t promotion_size, double promotion_ratio, int64_t 
promotion_min_size,
+        int64_t compaction_min_size)
+        : _promotion_size(promotion_size),
+          _promotion_ratio(promotion_ratio),
+          _promotion_min_size(promotion_min_size),
+          _compaction_min_size(compaction_min_size) {}
+
+int64_t CloudSizeBasedCumulativeCompactionPolicy::_level_size(const int64_t 
size) {
+    if (size < 1024) return 0;

Review Comment:
   warning: statement should be inside braces 
[readability-braces-around-statements]
   
   ```suggestion
       if (size < 1024) { return 0;
   }
   ```
   



##########
be/src/cloud/cloud_cumulative_compaction.cpp:
##########
@@ -0,0 +1,457 @@
+#include "cloud/cloud_cumulative_compaction.h"
+
+#include "cloud/config.h"
+#include "cloud/cloud_meta_mgr.h"
+#include "common/config.h"
+#include "common/logging.h"
+#include "common/status.h"
+#include "common/sync_point.h"
+#include "gen_cpp/cloud.pb.h"
+#include "olap/compaction.h"
+#include "olap/cumulative_compaction_policy.h"
+#include "service/backend_options.h"
+#include "util/trace.h"
+#include "util/uuid_generator.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+bvar::Adder<uint64_t> cumu_output_size("cumu_compaction", "output_size");
+
+CloudCumulativeCompaction::CloudCumulativeCompaction(CloudStorageEngine& 
engine, CloudTabletSPtr tablet)
+        : CloudCompactionMixin(engine, std::move(tablet), "BaseCompaction:" + 
std::to_string(tablet->tablet_id())) {
+    auto uuid = UUIDGenerator::instance()->next_uuid();
+    std::stringstream ss;
+    ss << uuid;
+    _uuid = ss.str();
+}
+
+CloudCumulativeCompaction::~CloudCumulativeCompaction() = default;
+
+Status CloudCumulativeCompaction::prepare_compact() {

Review Comment:
   warning: function 'prepare_compact' exceeds recommended size/complexity 
thresholds [readability-function-size]
   ```cpp
   Status CloudCumulativeCompaction::prepare_compact() {
                                     ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/cloud/cloud_cumulative_compaction.cpp:30:** 115 lines including 
whitespace and comments (threshold 80)
   ```cpp
   Status CloudCumulativeCompaction::prepare_compact() {
                                     ^
   ```
   
   </details>
   



##########
be/src/cloud/cloud_cumulative_compaction_policy.cpp:
##########
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_cumulative_compaction_policy.h"
+
+#include <algorithm>
+#include <list>
+#include <ostream>
+#include <string>
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "common/sync_point.h"
+#include "cloud/config.h"
+#include "olap/olap_common.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
+
+namespace doris {
+
+CloudSizeBasedCumulativeCompactionPolicy::CloudSizeBasedCumulativeCompactionPolicy(
+        int64_t promotion_size, double promotion_ratio, int64_t 
promotion_min_size,
+        int64_t compaction_min_size)
+        : _promotion_size(promotion_size),
+          _promotion_ratio(promotion_ratio),
+          _promotion_min_size(promotion_min_size),
+          _compaction_min_size(compaction_min_size) {}
+
+int64_t CloudSizeBasedCumulativeCompactionPolicy::_level_size(const int64_t 
size) {
+    if (size < 1024) return 0;
+    int64_t max_level = (int64_t)1
+                        << (sizeof(_promotion_size) * 8 - 1 - 
__builtin_clzl(_promotion_size / 2));
+    if (size >= max_level) return max_level;
+    return (int64_t)1 << (sizeof(size) * 8 - 1 - __builtin_clzl(size));
+}
+
+int CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(

Review Comment:
   warning: function 'pick_input_rowsets' exceeds recommended size/complexity 
thresholds [readability-function-size]
   ```cpp
   int CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
                                                 ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/cloud/cloud_cumulative_compaction_policy.cpp:50:** 126 lines 
including whitespace and comments (threshold 80)
   ```cpp
   int CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
                                                 ^
   ```
   
   </details>
   



##########
be/src/cloud/cloud_cumulative_compaction_policy.h:
##########
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "common/config.h"
+#include "cloud/cloud_tablet.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_meta.h"
+
+namespace doris {
+
+class Tablet;
+struct Version;
+
+class CloudSizeBasedCumulativeCompactionPolicy {
+public:
+    CloudSizeBasedCumulativeCompactionPolicy(
+            int64_t promotion_size = config::compaction_promotion_size_mbytes 
* 1024 * 1024,
+            double promotion_ratio = config::compaction_promotion_ratio,
+            int64_t promotion_min_size = 
config::compaction_promotion_min_size_mbytes * 1024 * 1024,
+            int64_t compaction_min_size = config::compaction_min_size_mbytes * 
1024 * 1024);
+
+    ~CloudSizeBasedCumulativeCompactionPolicy() {}
+
+    int64_t new_cumulative_point(CloudTablet* tablet, const RowsetSharedPtr& 
output_rowset,
+                                 Version& last_delete_version,
+                                 int64_t last_cumulative_point);
+
+    int pick_input_rowsets(CloudTablet* tablet, const 
std::vector<RowsetSharedPtr>& candidate_rowsets,
+                           const int64_t max_compaction_score, const int64_t 
min_compaction_score,
+                           std::vector<RowsetSharedPtr>* input_rowsets,
+                           Version* last_delete_version, size_t* 
compaction_score,
+                           bool allow_delete = false);
+
+private:
+    int64_t _level_size(const int64_t size);
+
+    int64_t cloud_promotion_size(CloudTablet* tablet) const;
+
+private:

Review Comment:
   warning: redundant access specifier has the same accessibility as the 
previous access specifier [readability-redundant-access-specifiers]
   
   ```suggestion
   
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/cloud/cloud_cumulative_compaction_policy.h:56:** previously 
declared here
   ```cpp
   private:
   ^
   ```
   
   </details>
   



##########
be/src/cloud/cloud_cumulative_compaction.cpp:
##########
@@ -0,0 +1,457 @@
+#include "cloud/cloud_cumulative_compaction.h"
+
+#include "cloud/config.h"
+#include "cloud/cloud_meta_mgr.h"
+#include "common/config.h"
+#include "common/logging.h"
+#include "common/status.h"
+#include "common/sync_point.h"
+#include "gen_cpp/cloud.pb.h"
+#include "olap/compaction.h"
+#include "olap/cumulative_compaction_policy.h"
+#include "service/backend_options.h"
+#include "util/trace.h"
+#include "util/uuid_generator.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+bvar::Adder<uint64_t> cumu_output_size("cumu_compaction", "output_size");
+
+CloudCumulativeCompaction::CloudCumulativeCompaction(CloudStorageEngine& 
engine, CloudTabletSPtr tablet)
+        : CloudCompactionMixin(engine, std::move(tablet), "BaseCompaction:" + 
std::to_string(tablet->tablet_id())) {
+    auto uuid = UUIDGenerator::instance()->next_uuid();
+    std::stringstream ss;
+    ss << uuid;
+    _uuid = ss.str();
+}
+
+CloudCumulativeCompaction::~CloudCumulativeCompaction() = default;
+
+Status CloudCumulativeCompaction::prepare_compact() {
+    if (_tablet->tablet_state() != TABLET_RUNNING) {
+        return Status::InternalError("invalid tablet state. tablet_id={}", 
_tablet->tablet_id());
+    }
+
+    std::vector<std::shared_ptr<CloudCumulativeCompaction>> cumu_compactions;
+    _engine.get_cumu_compaction(_tablet->tablet_id(), cumu_compactions);
+    if (!cumu_compactions.empty()) {
+        for (auto& cumu : cumu_compactions) {
+            _max_conflict_version =
+                    std::max(_max_conflict_version, 
cumu->_input_rowsets.back()->end_version());
+        }
+    }
+
+    int tried = 0;
+PREPARE_TRY_AGAIN:
+
+    bool need_sync_tablet = true;
+    {
+        std::shared_lock rlock(_tablet->get_header_lock());
+        // If number of rowsets is equal to approximate_num_rowsets, it is 
very likely that this tablet has been
+        // synchronized with meta-service.
+        if (_tablet->tablet_meta()->all_rs_metas().size() >=
+                    cloud_tablet()->fetch_add_approximate_num_rowsets(0) &&
+            cloud_tablet()->last_sync_time_s > 0) {
+            need_sync_tablet = false;
+        }
+    }
+    if (need_sync_tablet) {
+        RETURN_IF_ERROR(cloud_tablet()->sync_rowsets());
+    }
+
+    // pick rowsets to compact
+    auto st = pick_rowsets_to_compact();
+    if (!st.ok()) {
+        if (tried == 0 && _last_delete_version.first != -1) {
+            // we meet a delete version, should increase the cumulative point 
to let base compaction handle the delete version.
+            // plus 1 to skip the delete version.
+            // NOTICE: after that, the cumulative point may be larger than max 
version of this tablet, but it doesn't matter.
+            update_cumulative_point();
+        }
+        return st;
+    }
+
+    // prepare compaction job
+    cloud::TabletJobInfoPB job;
+    auto idx = job.mutable_idx();
+    idx->set_tablet_id(_tablet->tablet_id());
+    idx->set_table_id(_tablet->table_id());
+    idx->set_index_id(_tablet->index_id());
+    idx->set_partition_id(_tablet->partition_id());
+    auto compaction_job = job.add_compaction();
+    compaction_job->set_id(_uuid);
+    compaction_job->set_initiator(BackendOptions::get_localhost() + ':' +
+                                  
std::to_string(config::heartbeat_service_port));
+    compaction_job->set_type(cloud::TabletCompactionJobPB::CUMULATIVE);
+    compaction_job->set_base_compaction_cnt(_base_compaction_cnt);
+    compaction_job->set_cumulative_compaction_cnt(_cumulative_compaction_cnt);
+    using namespace std::chrono;
+    int64_t now = 
duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
+    _expiration = now + config::compaction_timeout_seconds;
+    compaction_job->set_expiration(_expiration);
+    compaction_job->set_lease(now + config::lease_compaction_interval_seconds 
* 4);
+    if (config::enable_parallel_cumu_compaction) {
+        // Set input version range to let meta-service judge version range 
conflict
+        
compaction_job->add_input_versions(_input_rowsets.front()->start_version());
+        
compaction_job->add_input_versions(_input_rowsets.back()->end_version());
+    }
+    cloud::StartTabletJobResponse resp;
+    st = _engine.meta_mgr().prepare_tablet_job(job, &resp);
+    if (!st.ok()) {
+        if (resp.status().code() == cloud::STALE_TABLET_CACHE) {
+            // set last_sync_time to 0 to force sync tablet next time
+            cloud_tablet()->last_sync_time_s = 0;
+        } else if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
+            // tablet not found
+            cloud_tablet()->recycle_cached_data();
+        } else if (resp.status().code() == cloud::JOB_TABLET_BUSY) {
+            if (config::enable_parallel_cumu_compaction && 
resp.version_in_compaction_size() > 0 &&
+                ++tried <= 2) {
+                _max_conflict_version = 
*std::max_element(resp.version_in_compaction().begin(),
+                                                          
resp.version_in_compaction().end());
+                LOG_INFO("retry pick input rowsets")
+                        .tag("job_id", _uuid)
+                        .tag("max_conflict_version", _max_conflict_version)
+                        .tag("tried", tried)
+                        .tag("msg", resp.status().msg());
+                goto PREPARE_TRY_AGAIN;
+            } else {
+                LOG_WARNING("failed to prepare cumu compaction")
+                        .tag("job_id", _uuid)
+                        .tag("msg", resp.status().msg());
+                return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("no 
suitable versions");
+            }
+        }
+        return st;
+    }
+
+    for (auto& rs : _input_rowsets) {
+        _input_row_num += rs->num_rows();
+        _input_segments += rs->num_segments();
+        _input_rowsets_size += rs->data_disk_size();
+    }
+    LOG_INFO("start CloudCumulativeCompaction, tablet_id={}, range=[{}-{}]", 
_tablet->tablet_id(),
+             _input_rowsets.front()->start_version(), 
_input_rowsets.back()->end_version())
+            .tag("job_id", _uuid)
+            .tag("input_rowsets", _input_rowsets.size())
+            .tag("input_rows", _input_row_num)
+            .tag("input_segments", _input_segments)
+            .tag("input_data_size", _input_rowsets_size)
+            .tag("tablet_max_version", cloud_tablet()->max_version_unlocked())
+            .tag("cumulative_point", cloud_tablet()->cumulative_layer_point())
+            .tag("num_rowsets", 
cloud_tablet()->fetch_add_approximate_num_rowsets(0))
+            .tag("cumu_num_rowsets", 
cloud_tablet()->fetch_add_approximate_cumu_num_rowsets(0));
+    return st;
+}
+
+Status CloudCumulativeCompaction::execute_compact() {
+    
TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudCumulativeCompaction::execute_compact_impl",
+                                      Status::OK(), this);
+    using namespace std::chrono;
+    auto start = steady_clock::now();
+    RETURN_IF_ERROR(CloudCompactionMixin::execute_compact());
+    LOG_INFO("finish CloudCumulativeCompaction, tablet_id={}, cost={}ms", 
_tablet->tablet_id(),
+             duration_cast<milliseconds>(steady_clock::now() - start).count())
+            .tag("job_id", _uuid)
+            .tag("input_rowsets", _input_rowsets.size())
+            .tag("input_rows", _input_row_num)
+            .tag("input_segments", _input_segments)
+            .tag("input_data_size", _input_rowsets_size)
+            .tag("output_rows", _output_rowset->num_rows())
+            .tag("output_segments", _output_rowset->num_segments())
+            .tag("output_data_size", _output_rowset->data_disk_size())
+            .tag("tablet_max_version", _tablet->max_version_unlocked())
+            .tag("cumulative_point", cloud_tablet()->cumulative_layer_point())
+            .tag("num_rowsets", 
cloud_tablet()->fetch_add_approximate_num_rowsets(0))
+            .tag("cumu_num_rowsets", 
cloud_tablet()->fetch_add_approximate_cumu_num_rowsets(0));
+
+    _state = CompactionState::SUCCESS;
+
+    
DorisMetrics::instance()->cumulative_compaction_deltas_total->increment(_input_rowsets.size());
+    
DorisMetrics::instance()->cumulative_compaction_bytes_total->increment(_input_rowsets_size);
+    cumu_output_size << _output_rowset->data_disk_size();
+
+    return Status::OK();
+}
+
+Status CloudCumulativeCompaction::modify_rowsets() {

Review Comment:
   warning: function 'modify_rowsets' exceeds recommended size/complexity 
thresholds [readability-function-size]
   ```cpp
   Status CloudCumulativeCompaction::modify_rowsets() {
                                     ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/cloud/cloud_cumulative_compaction.cpp:177:** 102 lines including 
whitespace and comments (threshold 80)
   ```cpp
   Status CloudCumulativeCompaction::modify_rowsets() {
                                     ^
   ```
   
   </details>
   



##########
be/src/cloud/cloud_cumulative_compaction_policy.h:
##########
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "common/config.h"
+#include "cloud/cloud_tablet.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_meta.h"
+
+namespace doris {
+
+class Tablet;
+struct Version;
+
+class CloudSizeBasedCumulativeCompactionPolicy {
+public:
+    CloudSizeBasedCumulativeCompactionPolicy(
+            int64_t promotion_size = config::compaction_promotion_size_mbytes 
* 1024 * 1024,
+            double promotion_ratio = config::compaction_promotion_ratio,
+            int64_t promotion_min_size = 
config::compaction_promotion_min_size_mbytes * 1024 * 1024,
+            int64_t compaction_min_size = config::compaction_min_size_mbytes * 
1024 * 1024);
+
+    ~CloudSizeBasedCumulativeCompactionPolicy() {}
+
+    int64_t new_cumulative_point(CloudTablet* tablet, const RowsetSharedPtr& 
output_rowset,
+                                 Version& last_delete_version,
+                                 int64_t last_cumulative_point);
+
+    int pick_input_rowsets(CloudTablet* tablet, const 
std::vector<RowsetSharedPtr>& candidate_rowsets,
+                           const int64_t max_compaction_score, const int64_t 
min_compaction_score,

Review Comment:
   warning: parameter 'max_compaction_score' is const-qualified in the function 
declaration; const-qualification of parameters only has an effect in function 
definitions [readability-avoid-const-params-in-decls]
   
   ```suggestion
                              int64_t max_compaction_score, const int64_t 
min_compaction_score,
   ```
   



##########
be/src/cloud/cloud_tablet.h:
##########
@@ -108,6 +108,75 @@ class CloudTablet final : public BaseTablet {
     void set_base_compaction_cnt(int64_t cnt) { _base_compaction_cnt = cnt; }
     void set_cumulative_compaction_cnt(int64_t cnt) { 
_cumulative_compaction_cnt = cnt; }
     void set_cumulative_layer_point(int64_t new_point);
+    
+    /*
+    int64_t base_compaction_cnt() const { return _base_compaction_cnt; }
+    void set_base_compaction_cnt(int64_t cnt) { _base_compaction_cnt = cnt; }
+    int64_t cumulative_compaction_cnt() const { return 
_cumulative_compaction_cnt; }
+    void set_cumulative_compaction_cnt(int64_t cnt) { 
_cumulative_compaction_cnt = cnt; }
+    int64_t full_compaction_cnt() const { return _full_compaction_cnt; }
+    void set_full_compaction_cnt(int64_t cnt) { _full_compaction_cnt = cnt; }
+    */
+
+    int64_t last_cumu_compaction_failure_time() { return 
_last_cumu_compaction_failure_millis; }
+    void set_last_cumu_compaction_failure_time(int64_t millis) {
+        _last_cumu_compaction_failure_millis = millis;
+    }
+
+    int64_t last_base_compaction_failure_time() { return 
_last_base_compaction_failure_millis; }
+    void set_last_base_compaction_failure_time(int64_t millis) {
+        _last_base_compaction_failure_millis = millis;
+    }
+
+    int64_t last_full_compaction_failure_time() { return 
_last_full_compaction_failure_millis; }
+    void set_last_full_compaction_failure_time(int64_t millis) {
+        _last_full_compaction_failure_millis = millis;
+    }
+
+    int64_t last_cumu_compaction_success_time() { return 
_last_cumu_compaction_success_millis; }
+    void set_last_cumu_compaction_success_time(int64_t millis) {
+        _last_cumu_compaction_success_millis = millis;
+    }
+
+    int64_t last_base_compaction_success_time() { return 
_last_base_compaction_success_millis; }
+    void set_last_base_compaction_success_time(int64_t millis) {
+        _last_base_compaction_success_millis = millis;
+    }
+
+    int64_t last_full_compaction_success_time() { return 
_last_full_compaction_success_millis; }
+    void set_last_full_compaction_success_time(int64_t millis) {
+        _last_full_compaction_success_millis = millis;
+    }
+
+    int64_t last_base_compaction_schedule_time() { return 
_last_base_compaction_schedule_millis; }
+    void set_last_base_compaction_schedule_time(int64_t millis) {
+        _last_base_compaction_schedule_millis = millis;
+    }
+
+    std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_base_compaction();
+
+    void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor,

Review Comment:
   warning: method 'traverse_rowsets' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> 
visitor,
   ```
   



##########
be/src/cloud/cloud_tablet.h:
##########
@@ -108,6 +108,75 @@
     void set_base_compaction_cnt(int64_t cnt) { _base_compaction_cnt = cnt; }
     void set_cumulative_compaction_cnt(int64_t cnt) { 
_cumulative_compaction_cnt = cnt; }
     void set_cumulative_layer_point(int64_t new_point);
+    
+    /*
+    int64_t base_compaction_cnt() const { return _base_compaction_cnt; }
+    void set_base_compaction_cnt(int64_t cnt) { _base_compaction_cnt = cnt; }
+    int64_t cumulative_compaction_cnt() const { return 
_cumulative_compaction_cnt; }
+    void set_cumulative_compaction_cnt(int64_t cnt) { 
_cumulative_compaction_cnt = cnt; }
+    int64_t full_compaction_cnt() const { return _full_compaction_cnt; }
+    void set_full_compaction_cnt(int64_t cnt) { _full_compaction_cnt = cnt; }
+    */
+
+    int64_t last_cumu_compaction_failure_time() { return 
_last_cumu_compaction_failure_millis; }
+    void set_last_cumu_compaction_failure_time(int64_t millis) {
+        _last_cumu_compaction_failure_millis = millis;
+    }
+
+    int64_t last_base_compaction_failure_time() { return 
_last_base_compaction_failure_millis; }
+    void set_last_base_compaction_failure_time(int64_t millis) {
+        _last_base_compaction_failure_millis = millis;
+    }
+
+    int64_t last_full_compaction_failure_time() { return 
_last_full_compaction_failure_millis; }
+    void set_last_full_compaction_failure_time(int64_t millis) {
+        _last_full_compaction_failure_millis = millis;
+    }
+
+    int64_t last_cumu_compaction_success_time() { return 
_last_cumu_compaction_success_millis; }
+    void set_last_cumu_compaction_success_time(int64_t millis) {
+        _last_cumu_compaction_success_millis = millis;
+    }
+
+    int64_t last_base_compaction_success_time() { return 
_last_base_compaction_success_millis; }
+    void set_last_base_compaction_success_time(int64_t millis) {
+        _last_base_compaction_success_millis = millis;
+    }
+
+    int64_t last_full_compaction_success_time() { return 
_last_full_compaction_success_millis; }
+    void set_last_full_compaction_success_time(int64_t millis) {
+        _last_full_compaction_success_millis = millis;
+    }
+
+    int64_t last_base_compaction_schedule_time() { return 
_last_base_compaction_schedule_millis; }
+    void set_last_base_compaction_schedule_time(int64_t millis) {
+        _last_base_compaction_schedule_millis = millis;
+    }
+
+    std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_base_compaction();
+
+    void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor,
+                          bool include_stale = false) {
+        std::shared_lock rlock(_meta_lock);
+        for (auto& [v, rs] : _rs_version_map) {
+            visitor(rs);
+        }
+        if (!include_stale) return;

Review Comment:
   warning: statement should be inside braces 
[readability-braces-around-statements]
   
   ```suggestion
           if (!include_stale) { return;
   }
   ```
   



##########
be/src/cloud/cloud_storage_engine.cpp:
##########
@@ -190,4 +234,418 @@
     }
 }
 
+void CloudStorageEngine::get_cumu_compaction(
+        int64_t tablet_id, 
std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res) {
+    std::lock_guard lock(_compaction_mtx);
+    if (auto it = _submitted_cumu_compactions.find(tablet_id);
+        it != _submitted_cumu_compactions.end()) {
+        res = it->second;
+    }
+}
+
+void CloudStorageEngine::_adjust_compaction_thread_num() {
+    int base_thread_num = get_base_thread_num();
+    if (_base_compaction_thread_pool->max_threads() != base_thread_num) {
+        int old_max_threads = _base_compaction_thread_pool->max_threads();
+        Status status = 
_base_compaction_thread_pool->set_max_threads(base_thread_num);
+        if (status.ok()) {
+            VLOG_NOTICE << "update base compaction thread pool max_threads 
from " << old_max_threads
+                        << " to " << base_thread_num;
+        }
+    }
+    if (_base_compaction_thread_pool->min_threads() != base_thread_num) {
+        int old_min_threads = _base_compaction_thread_pool->min_threads();
+        Status status = 
_base_compaction_thread_pool->set_min_threads(base_thread_num);
+        if (status.ok()) {
+            VLOG_NOTICE << "update base compaction thread pool min_threads 
from " << old_min_threads
+                        << " to " << base_thread_num;
+        }
+    }
+
+    int cumu_thread_num = get_cumu_thread_num();
+    if (_cumu_compaction_thread_pool->max_threads() != cumu_thread_num) {
+        int old_max_threads = _cumu_compaction_thread_pool->max_threads();
+        Status status = 
_cumu_compaction_thread_pool->set_max_threads(cumu_thread_num);
+        if (status.ok()) {
+            VLOG_NOTICE << "update cumu compaction thread pool max_threads 
from " << old_max_threads
+                        << " to " << cumu_thread_num;
+        }
+    }
+    if (_cumu_compaction_thread_pool->min_threads() != cumu_thread_num) {
+        int old_min_threads = _cumu_compaction_thread_pool->min_threads();
+        Status status = 
_cumu_compaction_thread_pool->set_min_threads(cumu_thread_num);
+        if (status.ok()) {
+            VLOG_NOTICE << "update cumu compaction thread pool min_threads 
from " << old_min_threads
+                        << " to " << cumu_thread_num;
+        }
+    }
+}
+
+void CloudStorageEngine::_compaction_tasks_producer_callback() {
+    LOG(INFO) << "try to start compaction producer process!";
+
+    int round = 0;
+    CompactionType compaction_type;
+
+    // Used to record the time when the score metric was last updated.
+    // The update of the score metric is accompanied by the logic of selecting 
the tablet.
+    // If there is no slot available, the logic of selecting the tablet will 
be terminated,
+    // which causes the score metric update to be terminated.
+    // In order to avoid this situation, we need to update the score regularly.
+    int64_t last_cumulative_score_update_time = 0;
+    int64_t last_base_score_update_time = 0;
+    static const int64_t check_score_interval_ms = 5000; // 5 secs
+
+    int64_t interval = config::generate_compaction_tasks_interval_ms;
+    do {
+        if (!config::disable_auto_compaction) {
+            _adjust_compaction_thread_num();
+
+            bool check_score = false;
+            int64_t cur_time = UnixMillis();
+            if (round < 
config::cumulative_compaction_rounds_for_each_base_compaction_round) {
+                compaction_type = CompactionType::CUMULATIVE_COMPACTION;
+                round++;
+                if (cur_time - last_cumulative_score_update_time >= 
check_score_interval_ms) {
+                    check_score = true;
+                    last_cumulative_score_update_time = cur_time;
+                }
+            } else {
+                compaction_type = CompactionType::BASE_COMPACTION;
+                round = 0;
+                if (cur_time - last_base_score_update_time >= 
check_score_interval_ms) {
+                    check_score = true;
+                    last_base_score_update_time = cur_time;
+                }
+            }
+            std::unique_ptr<ThreadPool>& thread_pool =
+                    (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
+                            ? _cumu_compaction_thread_pool
+                            : _base_compaction_thread_pool;
+            VLOG_CRITICAL << "compaction thread pool. type: "
+                          << (compaction_type == 
CompactionType::CUMULATIVE_COMPACTION ? "CUMU"
+                                                                               
        : "BASE")
+                          << ", num_threads: " << thread_pool->num_threads()
+                          << ", num_threads_pending_start: "
+                          << thread_pool->num_threads_pending_start()
+                          << ", num_active_threads: " << 
thread_pool->num_active_threads()
+                          << ", max_threads: " << thread_pool->max_threads()
+                          << ", min_threads: " << thread_pool->min_threads()
+                          << ", num_total_queued_tasks: " << 
thread_pool->get_queue_size();
+            std::vector<CloudTabletSPtr> tablets_compaction =
+                    _generate_cloud_compaction_tasks(compaction_type, 
check_score);
+
+            /// Regardless of whether the tablet is submitted for compaction 
or not,
+            /// we need to call 'reset_compaction' to clean up the 
base_compaction or cumulative_compaction objects
+            /// in the tablet, because these two objects store the tablet's 
own shared_ptr.
+            /// If it is not cleaned up, the reference count of the tablet 
will always be greater than 1,
+            /// thus cannot be collected by the garbage collector. 
(TabletManager::start_trash_sweep)
+            for (const auto& tablet : tablets_compaction) {
+                Status st = submit_compaction_task(tablet, compaction_type);
+                if (st.ok()) continue;
+                if ((!st.is<ErrorCode::BE_NO_SUITABLE_VERSION>() &&
+                     !st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) ||
+                    VLOG_DEBUG_IS_ON) {
+                    LOG(WARNING) << "failed to submit compaction task for 
tablet: "
+                                 << tablet->tablet_id() << ", err: " << st;
+                }
+            }
+            interval = config::generate_compaction_tasks_interval_ms;
+        } else {
+            interval = config::check_auto_compaction_interval_seconds * 1000;
+        }
+    } while 
(!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
+}
+
+std::vector<CloudTabletSPtr> 
CloudStorageEngine::_generate_cloud_compaction_tasks(
+        CompactionType compaction_type, bool check_score) {
+    std::vector<std::shared_ptr<CloudTablet>> tablets_compaction;
+
+    int64_t max_compaction_score = 0;
+    std::unordered_set<int64_t> tablet_preparing_cumu_compaction;
+    std::unordered_map<int64_t, 
std::vector<std::shared_ptr<CloudCumulativeCompaction>>>
+            submitted_cumu_compactions;
+    std::unordered_map<int64_t, std::shared_ptr<CloudBaseCompaction>> 
submitted_base_compactions;
+    std::unordered_map<int64_t, std::shared_ptr<CloudFullCompaction>> 
submitted_full_compactions;
+    {
+        std::lock_guard lock(_compaction_mtx);
+        tablet_preparing_cumu_compaction = _tablet_preparing_cumu_compaction;
+        submitted_cumu_compactions = _submitted_cumu_compactions;
+        submitted_base_compactions = _submitted_base_compactions;
+        submitted_full_compactions = _submitted_full_compactions;
+    }
+
+    bool need_pick_tablet = true;
+    int thread_per_disk =
+            config::compaction_task_num_per_fast_disk; // all disks are fast 
in cloud mode
+    int num_cumu =
+            std::accumulate(submitted_cumu_compactions.begin(), 
submitted_cumu_compactions.end(), 0,
+                            [](int a, auto& b) { return a + b.second.size(); 
});
+    int num_base = submitted_base_compactions.size() + 
submitted_full_compactions.size();
+    int n = thread_per_disk - num_cumu - num_base;
+    if (compaction_type == CompactionType::BASE_COMPACTION) {
+        // We need to reserve at least one thread for cumulative compaction,
+        // because base compactions may take too long to complete, which may
+        // leads to "too many rowsets" error.
+        int base_n = std::min(config::max_base_compaction_task_num_per_disk, 
thread_per_disk - 1) -
+                     num_base;
+        n = std::min(base_n, n);
+    }
+    if (n <= 0) { // No threads available
+        if (!check_score) return tablets_compaction;

Review Comment:
   warning: statement should be inside braces 
[readability-braces-around-statements]
   
   ```suggestion
           if (!check_score) { return tablets_compaction;
   }
   ```
   



##########
be/src/cloud/cloud_tablet.h:
##########
@@ -108,6 +108,75 @@
     void set_base_compaction_cnt(int64_t cnt) { _base_compaction_cnt = cnt; }
     void set_cumulative_compaction_cnt(int64_t cnt) { 
_cumulative_compaction_cnt = cnt; }
     void set_cumulative_layer_point(int64_t new_point);
+    
+    /*
+    int64_t base_compaction_cnt() const { return _base_compaction_cnt; }
+    void set_base_compaction_cnt(int64_t cnt) { _base_compaction_cnt = cnt; }
+    int64_t cumulative_compaction_cnt() const { return 
_cumulative_compaction_cnt; }
+    void set_cumulative_compaction_cnt(int64_t cnt) { 
_cumulative_compaction_cnt = cnt; }
+    int64_t full_compaction_cnt() const { return _full_compaction_cnt; }
+    void set_full_compaction_cnt(int64_t cnt) { _full_compaction_cnt = cnt; }
+    */
+
+    int64_t last_cumu_compaction_failure_time() { return 
_last_cumu_compaction_failure_millis; }
+    void set_last_cumu_compaction_failure_time(int64_t millis) {
+        _last_cumu_compaction_failure_millis = millis;
+    }
+
+    int64_t last_base_compaction_failure_time() { return 
_last_base_compaction_failure_millis; }
+    void set_last_base_compaction_failure_time(int64_t millis) {
+        _last_base_compaction_failure_millis = millis;
+    }
+
+    int64_t last_full_compaction_failure_time() { return 
_last_full_compaction_failure_millis; }
+    void set_last_full_compaction_failure_time(int64_t millis) {
+        _last_full_compaction_failure_millis = millis;
+    }
+
+    int64_t last_cumu_compaction_success_time() { return 
_last_cumu_compaction_success_millis; }
+    void set_last_cumu_compaction_success_time(int64_t millis) {
+        _last_cumu_compaction_success_millis = millis;
+    }
+
+    int64_t last_base_compaction_success_time() { return 
_last_base_compaction_success_millis; }
+    void set_last_base_compaction_success_time(int64_t millis) {
+        _last_base_compaction_success_millis = millis;
+    }
+
+    int64_t last_full_compaction_success_time() { return 
_last_full_compaction_success_millis; }
+    void set_last_full_compaction_success_time(int64_t millis) {
+        _last_full_compaction_success_millis = millis;
+    }
+
+    int64_t last_base_compaction_schedule_time() { return 
_last_base_compaction_schedule_millis; }
+    void set_last_base_compaction_schedule_time(int64_t millis) {
+        _last_base_compaction_schedule_millis = millis;
+    }
+
+    std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_base_compaction();
+
+    void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor,
+                          bool include_stale = false) {
+        std::shared_lock rlock(_meta_lock);
+        for (auto& [v, rs] : _rs_version_map) {
+            visitor(rs);
+        }
+        if (!include_stale) return;
+        for (auto& [v, rs] : _stale_rs_version_map) {
+            visitor(rs);
+        }
+    }
+
+    inline Version max_version() const {

Review Comment:
   warning: method 'max_version' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static inline Version max_version() {
   ```
   



##########
be/src/cloud/cloud_cumulative_compaction_policy.cpp:
##########
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_cumulative_compaction_policy.h"
+
+#include <algorithm>
+#include <list>
+#include <ostream>
+#include <string>
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "common/sync_point.h"
+#include "cloud/config.h"
+#include "olap/olap_common.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
+
+namespace doris {
+
+CloudSizeBasedCumulativeCompactionPolicy::CloudSizeBasedCumulativeCompactionPolicy(
+        int64_t promotion_size, double promotion_ratio, int64_t 
promotion_min_size,
+        int64_t compaction_min_size)
+        : _promotion_size(promotion_size),
+          _promotion_ratio(promotion_ratio),
+          _promotion_min_size(promotion_min_size),
+          _compaction_min_size(compaction_min_size) {}
+
+int64_t CloudSizeBasedCumulativeCompactionPolicy::_level_size(const int64_t 
size) {
+    if (size < 1024) return 0;
+    int64_t max_level = (int64_t)1
+                        << (sizeof(_promotion_size) * 8 - 1 - 
__builtin_clzl(_promotion_size / 2));
+    if (size >= max_level) return max_level;
+    return (int64_t)1 << (sizeof(size) * 8 - 1 - __builtin_clzl(size));
+}
+
+int CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
+        CloudTablet* tablet, const std::vector<RowsetSharedPtr>& 
candidate_rowsets,
+        const int64_t max_compaction_score, const int64_t min_compaction_score,
+        std::vector<RowsetSharedPtr>* input_rowsets, Version* 
last_delete_version,
+        size_t* compaction_score, bool allow_delete) {
+    //size_t promotion_size = tablet->cumulative_promotion_size();
+    auto max_version = tablet->max_version().first;
+    int transient_size = 0;
+    *compaction_score = 0;
+    int64_t total_size = 0;
+    for (auto& rowset : candidate_rowsets) {
+        // check whether this rowset is delete version
+        if (!allow_delete && rowset->rowset_meta()->has_delete_predicate()) {
+            *last_delete_version = rowset->version();
+            if (!input_rowsets->empty()) {
+                // we meet a delete version, and there were other versions 
before.
+                // we should compact those version before handling them over 
to base compaction
+                break;
+            } else {
+                // we meet a delete version, and no other versions before, 
skip it and continue
+                input_rowsets->clear();
+                *compaction_score = 0;
+                transient_size = 0;
+                continue;
+            }
+        }
+        if (tablet->tablet_state() == TABLET_NOTREADY) {
+            // If tablet under alter, keep latest 10 version so that base 
tablet max version
+            // not merged in new tablet, and then we can copy data from base 
tablet
+            if (rowset->version().second < max_version - 10) {
+                continue;
+            }
+        }
+        if (*compaction_score >= max_compaction_score) {
+            // got enough segments
+            break;
+        }
+        *compaction_score += rowset->rowset_meta()->get_compaction_score();
+        total_size += rowset->rowset_meta()->total_disk_size();
+
+        transient_size += 1;
+        input_rowsets->push_back(rowset);
+    }
+
+    // if there is delete version, do compaction directly
+    if (last_delete_version->first != -1) {
+        if (input_rowsets->size() == 1) {
+            auto rs_meta = input_rowsets->front()->rowset_meta();
+            // if there is only one rowset and not overlapping,
+            // we do not need to do cumulative compaction
+            if (!rs_meta->is_segments_overlapping()) {
+                input_rowsets->clear();
+                *compaction_score = 0;
+            }
+        }
+        return transient_size;
+    }
+
+    auto rs_begin = input_rowsets->begin();
+    size_t new_compaction_score = *compaction_score;
+    while (rs_begin != input_rowsets->end()) {
+        auto& rs_meta = (*rs_begin)->rowset_meta();
+        int current_level = _level_size(rs_meta->total_disk_size());
+        int remain_level = _level_size(total_size - 
rs_meta->total_disk_size());
+        // if current level less then remain level, input rowsets contain 
current rowset
+        // and process return; otherwise, input rowsets do not contain current 
rowset.
+        if (current_level <= remain_level) {
+            break;
+        }
+        total_size -= rs_meta->total_disk_size();
+        new_compaction_score -= rs_meta->get_compaction_score();
+        ++rs_begin;
+    }
+    if (rs_begin == input_rowsets->end()) { // No suitable level size found in 
`input_rowsets`
+        if (config::prioritize_query_perf_in_compaction && tablet->keys_type() 
!= DUP_KEYS) {
+            // While tablet's key type is not `DUP_KEYS`, compacting rowset in 
such tablets has a significant
+            // positive impact on queries and reduces space amplification, so 
we ignore level limitation and
+            // pick candidate rowsets as input rowsets.
+            return transient_size;
+        } else if (*compaction_score >= max_compaction_score) {
+            // Score of `input_rowsets` exceed max compaction score, which 
means `input_rowsets` will never change and
+            // this tablet will never execute cumulative compaction. MUST 
execute compaction on these `input_rowsets`
+            // to reduce compaction score.
+            RowsetSharedPtr rs_with_max_score;
+            uint32_t max_score = 1;
+            for (auto& rs : *input_rowsets) {
+                if (rs->rowset_meta()->get_compaction_score() > max_score) {
+                    max_score = rs->rowset_meta()->get_compaction_score();
+                    rs_with_max_score = rs;
+                }
+            }
+            if (rs_with_max_score) {
+                input_rowsets->clear();
+                input_rowsets->push_back(std::move(rs_with_max_score));
+                *compaction_score = max_score;
+                return transient_size;
+            }
+            // Exceeding max compaction score, do compaction on all candidate 
rowsets anyway
+            return transient_size;
+        }
+    }
+    input_rowsets->erase(input_rowsets->begin(), rs_begin);
+    *compaction_score = new_compaction_score;
+
+    VLOG_CRITICAL << "cumulative compaction size_based policy, 
compaction_score = "
+                  << *compaction_score << ", total_size = " << total_size
+                  //<< ", calc promotion size value = " << promotion_size
+                  << ", tablet = " << tablet->tablet_id() << ", input_rowset 
size "
+                  << input_rowsets->size();
+
+    // empty return
+    if (input_rowsets->empty()) {
+        return transient_size;
+    }
+
+    // if we have a sufficient number of segments, we should process the 
compaction.
+    // otherwise, we check number of segments and total_size whether can do 
compaction.
+    if (total_size < _compaction_min_size && *compaction_score < 
min_compaction_score) {
+        input_rowsets->clear();
+        *compaction_score = 0;
+    } else if (total_size >= _compaction_min_size && input_rowsets->size() == 
1) {
+        auto rs_meta = input_rowsets->front()->rowset_meta();
+        // if there is only one rowset and not overlapping,
+        // we do not need to do compaction
+        if (!rs_meta->is_segments_overlapping()) {
+            input_rowsets->clear();
+            *compaction_score = 0;
+        }
+    }
+    return transient_size;
+}
+
+int64_t 
CloudSizeBasedCumulativeCompactionPolicy::cloud_promotion_size(CloudTablet* t) 
const {
+    int64_t promotion_size = t->base_size() * _promotion_ratio;
+    // promotion_size is between _size_based_promotion_size and 
_size_based_promotion_min_size
+    return promotion_size > _promotion_size       ? _promotion_size
+           : promotion_size < _promotion_min_size ? _promotion_min_size
+                                                  : promotion_size;
+}
+
+int64_t CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point(

Review Comment:
   warning: method 'new_cumulative_point' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/cloud/cloud_cumulative_compaction_policy.h:46:
   ```diff
   -     int64_t new_cumulative_point(CloudTablet* tablet, const 
RowsetSharedPtr& output_rowset,
   +     static int64_t new_cumulative_point(CloudTablet* tablet, const 
RowsetSharedPtr& output_rowset,
   ```
   



##########
be/src/cloud/cloud_cumulative_compaction_policy.h:
##########
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stddef.h>
+#include <stdint.h>

Review Comment:
   warning: inclusion of deprecated C++ header 'stdint.h'; consider using 
'cstdint' instead [modernize-deprecated-headers]
   
   ```suggestion
   #include <cstdint>
   ```
   



##########
be/src/cloud/cloud_cumulative_compaction_policy.cpp:
##########
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_cumulative_compaction_policy.h"
+
+#include <algorithm>
+#include <list>
+#include <ostream>
+#include <string>
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "common/sync_point.h"
+#include "cloud/config.h"
+#include "olap/olap_common.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
+
+namespace doris {
+
+CloudSizeBasedCumulativeCompactionPolicy::CloudSizeBasedCumulativeCompactionPolicy(
+        int64_t promotion_size, double promotion_ratio, int64_t 
promotion_min_size,
+        int64_t compaction_min_size)
+        : _promotion_size(promotion_size),
+          _promotion_ratio(promotion_ratio),
+          _promotion_min_size(promotion_min_size),
+          _compaction_min_size(compaction_min_size) {}
+
+int64_t CloudSizeBasedCumulativeCompactionPolicy::_level_size(const int64_t 
size) {
+    if (size < 1024) return 0;
+    int64_t max_level = (int64_t)1
+                        << (sizeof(_promotion_size) * 8 - 1 - 
__builtin_clzl(_promotion_size / 2));
+    if (size >= max_level) return max_level;

Review Comment:
   warning: statement should be inside braces 
[readability-braces-around-statements]
   
   ```suggestion
       if (size >= max_level) { return max_level;
   }
   ```
   



##########
be/src/cloud/cloud_base_compaction.cpp:
##########
@@ -0,0 +1,369 @@
+#include "cloud/cloud_base_compaction.h"
+
+#include <boost/container_hash/hash.hpp>
+
+#include "cloud/cloud_meta_mgr.h"
+#include "cloud/config.h"
+#include "common/config.h"
+#include "common/sync_point.h"
+#include "gen_cpp/cloud.pb.h"
+#include "olap/compaction.h"
+#include "olap/task/engine_checksum_task.h"
+#include "service/backend_options.h"
+#include "util/thread.h"
+#include "util/uuid_generator.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+bvar::Adder<uint64_t> base_output_size("base_compaction", "output_size");
+
+CloudBaseCompaction::CloudBaseCompaction(CloudStorageEngine& engine, 
CloudTabletSPtr tablet)
+        : CloudCompactionMixin(engine, std::move(tablet), "BaseCompaction:" + 
std::to_string(tablet->tablet_id())) {
+    auto uuid = UUIDGenerator::instance()->next_uuid();
+    std::stringstream ss;
+    ss << uuid;
+    _uuid = ss.str();
+}
+
+CloudBaseCompaction::~CloudBaseCompaction() = default;
+
+Status CloudBaseCompaction::prepare_compact() {
+    if (_tablet->tablet_state() != TABLET_RUNNING) {
+        return Status::InternalError("invalid tablet state. tablet_id={}", 
_tablet->tablet_id());
+    }
+
+    bool need_sync_tablet = true;
+    {
+        std::shared_lock rlock(_tablet->get_header_lock());
+        // If number of rowsets is equal to approximate_num_rowsets, it is 
very likely that this tablet has been
+        // synchronized with meta-service.
+        if (_tablet->tablet_meta()->all_rs_metas().size() >=
+                    cloud_tablet()->fetch_add_approximate_num_rowsets(0) &&
+            cloud_tablet()->last_sync_time_s > 0) {
+            need_sync_tablet = false;
+        }
+    }
+    if (need_sync_tablet) {
+        RETURN_IF_ERROR(cloud_tablet()->sync_rowsets());
+    }
+
+    RETURN_IF_ERROR(pick_rowsets_to_compact());
+
+    // prepare compaction job
+    cloud::TabletJobInfoPB job;
+    auto idx = job.mutable_idx();
+    idx->set_tablet_id(_tablet->tablet_id());
+    idx->set_table_id(_tablet->table_id());
+    idx->set_index_id(_tablet->index_id());
+    idx->set_partition_id(_tablet->partition_id());
+    auto compaction_job = job.add_compaction();
+    compaction_job->set_id(_uuid);
+    compaction_job->set_initiator(BackendOptions::get_localhost() + ':' +
+                                  
std::to_string(config::heartbeat_service_port));
+    compaction_job->set_type(cloud::TabletCompactionJobPB::BASE);
+    compaction_job->set_base_compaction_cnt(_base_compaction_cnt);
+    compaction_job->set_cumulative_compaction_cnt(_cumulative_compaction_cnt);
+    using namespace std::chrono;
+    int64_t now = 
duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
+    _expiration = now + config::compaction_timeout_seconds;
+    compaction_job->set_expiration(_expiration);
+    compaction_job->set_lease(now + config::lease_compaction_interval_seconds 
* 4);
+    cloud::StartTabletJobResponse resp;
+    //auto st = cloud::meta_mgr()->prepare_tablet_job(job, &resp);
+    auto st = _engine.meta_mgr().prepare_tablet_job(job, &resp);
+    if (!st.ok()) {
+        if (resp.status().code() == cloud::STALE_TABLET_CACHE) {
+            // set last_sync_time to 0 to force sync tablet next time
+            cloud_tablet()->last_sync_time_s = 0;
+        } else if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
+            // tablet not found
+            cloud_tablet()->recycle_cached_data();
+        }
+        return st;
+    }
+
+    for (auto& rs : _input_rowsets) {
+        _input_row_num += rs->num_rows();
+        _input_segments += rs->num_segments();
+        _input_rowsets_size += rs->data_disk_size();
+    }
+    LOG_INFO("start CloudBaseCompaction, tablet_id={}, range=[{}-{}]", 
_tablet->tablet_id(),
+             _input_rowsets.front()->start_version(), 
_input_rowsets.back()->end_version())
+            .tag("job_id", _uuid)
+            .tag("input_rowsets", _input_rowsets.size())
+            .tag("input_rows", _input_row_num)
+            .tag("input_segments", _input_segments)
+            .tag("input_data_size", _input_rowsets_size);
+    return st;
+}
+
+void CloudBaseCompaction::_filter_input_rowset() {
+    // if dup_key and no delete predicate
+    // we skip big files to save resources
+    if (_tablet->keys_type() != KeysType::DUP_KEYS) {
+        return;
+    }
+    for (auto& rs : _input_rowsets) {
+        if (rs->rowset_meta()->has_delete_predicate()) {
+            return;
+        }
+    }
+    int64_t max_size = config::base_compaction_dup_key_max_file_size_mbytes * 
1024 * 1024;
+    // first find a proper rowset for start
+    auto rs_iter = _input_rowsets.begin();
+    while (rs_iter != _input_rowsets.end()) {
+        if ((*rs_iter)->rowset_meta()->total_disk_size() >= max_size) {
+            rs_iter = _input_rowsets.erase(rs_iter);
+        } else {
+            break;
+        }
+    }
+}
+
+Status CloudBaseCompaction::pick_rowsets_to_compact() {
+    _input_rowsets.clear();
+    {
+        std::shared_lock rlock(_tablet->get_header_lock());
+        _base_compaction_cnt = cloud_tablet()->base_compaction_cnt();
+        _cumulative_compaction_cnt = 
cloud_tablet()->cumulative_compaction_cnt();
+        _input_rowsets = 
cloud_tablet()->pick_candidate_rowsets_to_base_compaction();
+    }
+    if (auto st = check_version_continuity(_input_rowsets); !st.ok()) {
+        DCHECK(false) << st;
+        return st;
+    }
+    _filter_input_rowset();
+    if (_input_rowsets.size() <= 1) {
+        return Status::Error<BE_NO_SUITABLE_VERSION>(
+                "insuffient compation input rowset, #rowsets={}", 
_input_rowsets.size());
+    }
+
+    if (_input_rowsets.size() == 2 && _input_rowsets[0]->end_version() == 1) {
+        // the tablet is with rowset: [0-1], [2-y]
+        // and [0-1] has no data. in this situation, no need to do base 
compaction.
+        return Status::Error<BE_NO_SUITABLE_VERSION>("no suitable versions for 
compaction");
+    }
+
+    // 1. cumulative rowset must reach base_compaction_min_rowset_num threshold
+    if (_input_rowsets.size() > config::base_compaction_min_rowset_num) {
+        VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << 
_tablet->tablet_id()
+                    << ", num_cumulative_rowsets=" << _input_rowsets.size() - 1
+                    << ", base_compaction_num_cumulative_rowsets="
+                    << config::base_compaction_min_rowset_num;
+        return Status::OK();
+    }
+
+    // 2. the ratio between base rowset and all input cumulative rowsets 
reaches the threshold
+    // `_input_rowsets` has been sorted by end version, so we consider 
`_input_rowsets[0]` is the base rowset.
+    int64_t base_size = _input_rowsets.front()->data_disk_size();
+    int64_t cumulative_total_size = 0;
+    for (auto it = _input_rowsets.begin() + 1; it != _input_rowsets.end(); 
++it) {
+        cumulative_total_size += (*it)->data_disk_size();
+    }
+
+    double base_cumulative_delta_ratio = 
config::base_compaction_min_data_ratio;
+    if (base_size == 0) {
+        // base_size == 0 means this may be a base version [0-1], which has no 
data.
+        // set to 1 to void divide by zero
+        base_size = 1;
+    }
+    double cumulative_base_ratio = static_cast<double>(cumulative_total_size) 
/ base_size;
+
+    if (cumulative_base_ratio > base_cumulative_delta_ratio) {
+        VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << 
_tablet->tablet_id()
+                    << ", cumulative_total_size=" << cumulative_total_size
+                    << ", base_size=" << base_size
+                    << ", cumulative_base_ratio=" << cumulative_base_ratio
+                    << ", policy_ratio=" << base_cumulative_delta_ratio;
+        return Status::OK();
+    }
+
+    // 3. the interval since last base compaction reaches the threshold
+    int64_t base_creation_time = _input_rowsets[0]->creation_time();
+    int64_t interval_threshold = 
config::base_compaction_interval_seconds_since_last_operation;
+    int64_t interval_since_last_base_compaction = time(nullptr) - 
base_creation_time;
+    if (interval_since_last_base_compaction > interval_threshold) {
+        VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << 
_tablet->tablet_id()
+                    << ", interval_since_last_base_compaction="
+                    << interval_since_last_base_compaction
+                    << ", interval_threshold=" << interval_threshold;
+        return Status::OK();
+    }
+
+    VLOG_NOTICE << "don't satisfy the base compaction policy. tablet=" << 
_tablet->tablet_id()
+                << ", num_cumulative_rowsets=" << _input_rowsets.size() - 1
+                << ", cumulative_base_ratio=" << cumulative_base_ratio
+                << ", interval_since_last_base_compaction=" << 
interval_since_last_base_compaction;
+    return Status::Error<BE_NO_SUITABLE_VERSION>("no suitable versions for 
compaction");
+}
+
+Status CloudBaseCompaction::execute_compact() {
+    if (config::enable_base_compaction_idle_sched) {
+        Thread::set_idle_sched();
+    }
+
+    SCOPED_ATTACH_TASK(_mem_tracker);
+
+    using namespace std::chrono;
+    auto start = steady_clock::now();
+    RETURN_IF_ERROR(CloudCompactionMixin::execute_compact());
+    LOG_INFO("finish CloudBaseCompaction, tablet_id={}, cost={}ms", 
_tablet->tablet_id(),
+             duration_cast<milliseconds>(steady_clock::now() - start).count())
+            .tag("job_id", _uuid)
+            .tag("input_rowsets", _input_rowsets.size())
+            .tag("input_rows", _input_row_num)
+            .tag("input_segments", _input_segments)
+            .tag("input_data_size", _input_rowsets_size)
+            .tag("output_rows", _output_rowset->num_rows())
+            .tag("output_segments", _output_rowset->num_segments())
+            .tag("output_data_size", _output_rowset->data_disk_size());
+
+    //_compaction_succeed = true;
+    _state = CompactionState::SUCCESS;
+
+    
DorisMetrics::instance()->base_compaction_deltas_total->increment(_input_rowsets.size());
+    
DorisMetrics::instance()->base_compaction_bytes_total->increment(_input_rowsets_size);
+    base_output_size << _output_rowset->data_disk_size();
+
+    return Status::OK();
+}
+
+Status CloudBaseCompaction::modify_rowsets() {

Review Comment:
   warning: function 'modify_rowsets' exceeds recommended size/complexity 
thresholds [readability-function-size]
   ```cpp
   Status CloudBaseCompaction::modify_rowsets() {
                               ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/cloud/cloud_base_compaction.cpp:231:** 82 lines including 
whitespace and comments (threshold 80)
   ```cpp
   Status CloudBaseCompaction::modify_rowsets() {
                               ^
   ```
   
   </details>
   



##########
be/src/cloud/cloud_base_compaction.cpp:
##########
@@ -0,0 +1,369 @@
+#include "cloud/cloud_base_compaction.h"
+
+#include <boost/container_hash/hash.hpp>
+
+#include "cloud/cloud_meta_mgr.h"
+#include "cloud/config.h"
+#include "common/config.h"
+#include "common/sync_point.h"
+#include "gen_cpp/cloud.pb.h"
+#include "olap/compaction.h"
+#include "olap/task/engine_checksum_task.h"
+#include "service/backend_options.h"
+#include "util/thread.h"
+#include "util/uuid_generator.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+bvar::Adder<uint64_t> base_output_size("base_compaction", "output_size");
+
+CloudBaseCompaction::CloudBaseCompaction(CloudStorageEngine& engine, 
CloudTabletSPtr tablet)
+        : CloudCompactionMixin(engine, std::move(tablet), "BaseCompaction:" + 
std::to_string(tablet->tablet_id())) {
+    auto uuid = UUIDGenerator::instance()->next_uuid();
+    std::stringstream ss;
+    ss << uuid;
+    _uuid = ss.str();
+}
+
+CloudBaseCompaction::~CloudBaseCompaction() = default;
+
+Status CloudBaseCompaction::prepare_compact() {
+    if (_tablet->tablet_state() != TABLET_RUNNING) {
+        return Status::InternalError("invalid tablet state. tablet_id={}", 
_tablet->tablet_id());
+    }
+
+    bool need_sync_tablet = true;
+    {
+        std::shared_lock rlock(_tablet->get_header_lock());
+        // If number of rowsets is equal to approximate_num_rowsets, it is 
very likely that this tablet has been
+        // synchronized with meta-service.
+        if (_tablet->tablet_meta()->all_rs_metas().size() >=
+                    cloud_tablet()->fetch_add_approximate_num_rowsets(0) &&
+            cloud_tablet()->last_sync_time_s > 0) {
+            need_sync_tablet = false;
+        }
+    }
+    if (need_sync_tablet) {
+        RETURN_IF_ERROR(cloud_tablet()->sync_rowsets());
+    }
+
+    RETURN_IF_ERROR(pick_rowsets_to_compact());
+
+    // prepare compaction job
+    cloud::TabletJobInfoPB job;
+    auto idx = job.mutable_idx();
+    idx->set_tablet_id(_tablet->tablet_id());
+    idx->set_table_id(_tablet->table_id());
+    idx->set_index_id(_tablet->index_id());
+    idx->set_partition_id(_tablet->partition_id());
+    auto compaction_job = job.add_compaction();
+    compaction_job->set_id(_uuid);
+    compaction_job->set_initiator(BackendOptions::get_localhost() + ':' +
+                                  
std::to_string(config::heartbeat_service_port));
+    compaction_job->set_type(cloud::TabletCompactionJobPB::BASE);
+    compaction_job->set_base_compaction_cnt(_base_compaction_cnt);
+    compaction_job->set_cumulative_compaction_cnt(_cumulative_compaction_cnt);
+    using namespace std::chrono;
+    int64_t now = 
duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
+    _expiration = now + config::compaction_timeout_seconds;
+    compaction_job->set_expiration(_expiration);
+    compaction_job->set_lease(now + config::lease_compaction_interval_seconds 
* 4);
+    cloud::StartTabletJobResponse resp;
+    //auto st = cloud::meta_mgr()->prepare_tablet_job(job, &resp);
+    auto st = _engine.meta_mgr().prepare_tablet_job(job, &resp);
+    if (!st.ok()) {
+        if (resp.status().code() == cloud::STALE_TABLET_CACHE) {
+            // set last_sync_time to 0 to force sync tablet next time
+            cloud_tablet()->last_sync_time_s = 0;
+        } else if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
+            // tablet not found
+            cloud_tablet()->recycle_cached_data();
+        }
+        return st;
+    }
+
+    for (auto& rs : _input_rowsets) {
+        _input_row_num += rs->num_rows();
+        _input_segments += rs->num_segments();
+        _input_rowsets_size += rs->data_disk_size();
+    }
+    LOG_INFO("start CloudBaseCompaction, tablet_id={}, range=[{}-{}]", 
_tablet->tablet_id(),
+             _input_rowsets.front()->start_version(), 
_input_rowsets.back()->end_version())
+            .tag("job_id", _uuid)
+            .tag("input_rowsets", _input_rowsets.size())
+            .tag("input_rows", _input_row_num)
+            .tag("input_segments", _input_segments)
+            .tag("input_data_size", _input_rowsets_size);
+    return st;
+}
+
+void CloudBaseCompaction::_filter_input_rowset() {

Review Comment:
   warning: method '_filter_input_rowset' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/cloud/cloud_base_compaction.h:29:
   ```diff
   -     void _filter_input_rowset();
   +     static void _filter_input_rowset();
   ```
   



##########
be/src/cloud/cloud_cumulative_compaction_policy.h:
##########
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "common/config.h"
+#include "cloud/cloud_tablet.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_meta.h"
+
+namespace doris {
+
+class Tablet;
+struct Version;
+
+class CloudSizeBasedCumulativeCompactionPolicy {
+public:
+    CloudSizeBasedCumulativeCompactionPolicy(
+            int64_t promotion_size = config::compaction_promotion_size_mbytes 
* 1024 * 1024,
+            double promotion_ratio = config::compaction_promotion_ratio,
+            int64_t promotion_min_size = 
config::compaction_promotion_min_size_mbytes * 1024 * 1024,
+            int64_t compaction_min_size = config::compaction_min_size_mbytes * 
1024 * 1024);
+
+    ~CloudSizeBasedCumulativeCompactionPolicy() {}
+
+    int64_t new_cumulative_point(CloudTablet* tablet, const RowsetSharedPtr& 
output_rowset,
+                                 Version& last_delete_version,
+                                 int64_t last_cumulative_point);
+
+    int pick_input_rowsets(CloudTablet* tablet, const 
std::vector<RowsetSharedPtr>& candidate_rowsets,
+                           const int64_t max_compaction_score, const int64_t 
min_compaction_score,

Review Comment:
   warning: parameter 'min_compaction_score' is const-qualified in the function 
declaration; const-qualification of parameters only has an effect in function 
definitions [readability-avoid-const-params-in-decls]
   
   ```suggestion
                              const int64_t max_compaction_score, int64_t 
min_compaction_score,
   ```
   



##########
be/src/cloud/cloud_storage_engine.cpp:
##########
@@ -190,4 +234,418 @@
     }
 }
 
+void CloudStorageEngine::get_cumu_compaction(
+        int64_t tablet_id, 
std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res) {
+    std::lock_guard lock(_compaction_mtx);
+    if (auto it = _submitted_cumu_compactions.find(tablet_id);
+        it != _submitted_cumu_compactions.end()) {
+        res = it->second;
+    }
+}
+
+void CloudStorageEngine::_adjust_compaction_thread_num() {

Review Comment:
   warning: method '_adjust_compaction_thread_num' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/cloud/cloud_storage_engine.h:83:
   ```diff
   -     void _adjust_compaction_thread_num();
   +     static void _adjust_compaction_thread_num();
   ```
   



##########
be/src/cloud/cloud_cumulative_compaction_policy.h:
##########
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stddef.h>

Review Comment:
   warning: inclusion of deprecated C++ header 'stddef.h'; consider using 
'cstddef' instead [modernize-deprecated-headers]
   
   ```suggestion
   #include <cstddef>
   ```
   



##########
be/src/cloud/cloud_cumulative_compaction_policy.h:
##########
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "common/config.h"
+#include "cloud/cloud_tablet.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_meta.h"
+
+namespace doris {
+
+class Tablet;
+struct Version;
+
+class CloudSizeBasedCumulativeCompactionPolicy {
+public:
+    CloudSizeBasedCumulativeCompactionPolicy(
+            int64_t promotion_size = config::compaction_promotion_size_mbytes 
* 1024 * 1024,
+            double promotion_ratio = config::compaction_promotion_ratio,
+            int64_t promotion_min_size = 
config::compaction_promotion_min_size_mbytes * 1024 * 1024,
+            int64_t compaction_min_size = config::compaction_min_size_mbytes * 
1024 * 1024);
+
+    ~CloudSizeBasedCumulativeCompactionPolicy() {}

Review Comment:
   warning: use '= default' to define a trivial destructor 
[modernize-use-equals-default]
   
   ```suggestion
       ~CloudSizeBasedCumulativeCompactionPolicy() = default;
   ```
   



##########
be/src/cloud/cloud_storage_engine.cpp:
##########
@@ -190,4 +234,418 @@ void CloudStorageEngine::_sync_tablets_thread_callback() {
     }
 }
 
+void CloudStorageEngine::get_cumu_compaction(

Review Comment:
   warning: method 'get_cumu_compaction' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static void CloudStorageEngine::get_cumu_compaction(
   ```
   



##########
be/src/cloud/cloud_storage_engine.cpp:
##########
@@ -190,4 +234,418 @@
     }
 }
 
+void CloudStorageEngine::get_cumu_compaction(
+        int64_t tablet_id, 
std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res) {
+    std::lock_guard lock(_compaction_mtx);
+    if (auto it = _submitted_cumu_compactions.find(tablet_id);
+        it != _submitted_cumu_compactions.end()) {
+        res = it->second;
+    }
+}
+
+void CloudStorageEngine::_adjust_compaction_thread_num() {
+    int base_thread_num = get_base_thread_num();
+    if (_base_compaction_thread_pool->max_threads() != base_thread_num) {
+        int old_max_threads = _base_compaction_thread_pool->max_threads();
+        Status status = 
_base_compaction_thread_pool->set_max_threads(base_thread_num);
+        if (status.ok()) {
+            VLOG_NOTICE << "update base compaction thread pool max_threads 
from " << old_max_threads
+                        << " to " << base_thread_num;
+        }
+    }
+    if (_base_compaction_thread_pool->min_threads() != base_thread_num) {
+        int old_min_threads = _base_compaction_thread_pool->min_threads();
+        Status status = 
_base_compaction_thread_pool->set_min_threads(base_thread_num);
+        if (status.ok()) {
+            VLOG_NOTICE << "update base compaction thread pool min_threads 
from " << old_min_threads
+                        << " to " << base_thread_num;
+        }
+    }
+
+    int cumu_thread_num = get_cumu_thread_num();
+    if (_cumu_compaction_thread_pool->max_threads() != cumu_thread_num) {
+        int old_max_threads = _cumu_compaction_thread_pool->max_threads();
+        Status status = 
_cumu_compaction_thread_pool->set_max_threads(cumu_thread_num);
+        if (status.ok()) {
+            VLOG_NOTICE << "update cumu compaction thread pool max_threads 
from " << old_max_threads
+                        << " to " << cumu_thread_num;
+        }
+    }
+    if (_cumu_compaction_thread_pool->min_threads() != cumu_thread_num) {
+        int old_min_threads = _cumu_compaction_thread_pool->min_threads();
+        Status status = 
_cumu_compaction_thread_pool->set_min_threads(cumu_thread_num);
+        if (status.ok()) {
+            VLOG_NOTICE << "update cumu compaction thread pool min_threads 
from " << old_min_threads
+                        << " to " << cumu_thread_num;
+        }
+    }
+}
+
+void CloudStorageEngine::_compaction_tasks_producer_callback() {
+    LOG(INFO) << "try to start compaction producer process!";
+
+    int round = 0;
+    CompactionType compaction_type;
+
+    // Used to record the time when the score metric was last updated.
+    // The update of the score metric is accompanied by the logic of selecting 
the tablet.
+    // If there is no slot available, the logic of selecting the tablet will 
be terminated,
+    // which causes the score metric update to be terminated.
+    // In order to avoid this situation, we need to update the score regularly.
+    int64_t last_cumulative_score_update_time = 0;
+    int64_t last_base_score_update_time = 0;
+    static const int64_t check_score_interval_ms = 5000; // 5 secs
+
+    int64_t interval = config::generate_compaction_tasks_interval_ms;
+    do {
+        if (!config::disable_auto_compaction) {
+            _adjust_compaction_thread_num();
+
+            bool check_score = false;
+            int64_t cur_time = UnixMillis();
+            if (round < 
config::cumulative_compaction_rounds_for_each_base_compaction_round) {
+                compaction_type = CompactionType::CUMULATIVE_COMPACTION;
+                round++;
+                if (cur_time - last_cumulative_score_update_time >= 
check_score_interval_ms) {
+                    check_score = true;
+                    last_cumulative_score_update_time = cur_time;
+                }
+            } else {
+                compaction_type = CompactionType::BASE_COMPACTION;
+                round = 0;
+                if (cur_time - last_base_score_update_time >= 
check_score_interval_ms) {
+                    check_score = true;
+                    last_base_score_update_time = cur_time;
+                }
+            }
+            std::unique_ptr<ThreadPool>& thread_pool =
+                    (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
+                            ? _cumu_compaction_thread_pool
+                            : _base_compaction_thread_pool;
+            VLOG_CRITICAL << "compaction thread pool. type: "
+                          << (compaction_type == 
CompactionType::CUMULATIVE_COMPACTION ? "CUMU"
+                                                                               
        : "BASE")
+                          << ", num_threads: " << thread_pool->num_threads()
+                          << ", num_threads_pending_start: "
+                          << thread_pool->num_threads_pending_start()
+                          << ", num_active_threads: " << 
thread_pool->num_active_threads()
+                          << ", max_threads: " << thread_pool->max_threads()
+                          << ", min_threads: " << thread_pool->min_threads()
+                          << ", num_total_queued_tasks: " << 
thread_pool->get_queue_size();
+            std::vector<CloudTabletSPtr> tablets_compaction =
+                    _generate_cloud_compaction_tasks(compaction_type, 
check_score);
+
+            /// Regardless of whether the tablet is submitted for compaction 
or not,
+            /// we need to call 'reset_compaction' to clean up the 
base_compaction or cumulative_compaction objects
+            /// in the tablet, because these two objects store the tablet's 
own shared_ptr.
+            /// If it is not cleaned up, the reference count of the tablet 
will always be greater than 1,
+            /// thus cannot be collected by the garbage collector. 
(TabletManager::start_trash_sweep)
+            for (const auto& tablet : tablets_compaction) {
+                Status st = submit_compaction_task(tablet, compaction_type);
+                if (st.ok()) continue;
+                if ((!st.is<ErrorCode::BE_NO_SUITABLE_VERSION>() &&
+                     !st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) ||
+                    VLOG_DEBUG_IS_ON) {
+                    LOG(WARNING) << "failed to submit compaction task for 
tablet: "
+                                 << tablet->tablet_id() << ", err: " << st;
+                }
+            }
+            interval = config::generate_compaction_tasks_interval_ms;
+        } else {
+            interval = config::check_auto_compaction_interval_seconds * 1000;
+        }
+    } while 
(!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
+}
+
+std::vector<CloudTabletSPtr> 
CloudStorageEngine::_generate_cloud_compaction_tasks(
+        CompactionType compaction_type, bool check_score) {
+    std::vector<std::shared_ptr<CloudTablet>> tablets_compaction;
+
+    int64_t max_compaction_score = 0;
+    std::unordered_set<int64_t> tablet_preparing_cumu_compaction;
+    std::unordered_map<int64_t, 
std::vector<std::shared_ptr<CloudCumulativeCompaction>>>
+            submitted_cumu_compactions;
+    std::unordered_map<int64_t, std::shared_ptr<CloudBaseCompaction>> 
submitted_base_compactions;
+    std::unordered_map<int64_t, std::shared_ptr<CloudFullCompaction>> 
submitted_full_compactions;
+    {
+        std::lock_guard lock(_compaction_mtx);
+        tablet_preparing_cumu_compaction = _tablet_preparing_cumu_compaction;
+        submitted_cumu_compactions = _submitted_cumu_compactions;
+        submitted_base_compactions = _submitted_base_compactions;
+        submitted_full_compactions = _submitted_full_compactions;
+    }
+
+    bool need_pick_tablet = true;
+    int thread_per_disk =
+            config::compaction_task_num_per_fast_disk; // all disks are fast 
in cloud mode
+    int num_cumu =
+            std::accumulate(submitted_cumu_compactions.begin(), 
submitted_cumu_compactions.end(), 0,
+                            [](int a, auto& b) { return a + b.second.size(); 
});
+    int num_base = submitted_base_compactions.size() + 
submitted_full_compactions.size();
+    int n = thread_per_disk - num_cumu - num_base;
+    if (compaction_type == CompactionType::BASE_COMPACTION) {
+        // We need to reserve at least one thread for cumulative compaction,
+        // because base compactions may take too long to complete, which may
+        // leads to "too many rowsets" error.
+        int base_n = std::min(config::max_base_compaction_task_num_per_disk, 
thread_per_disk - 1) -
+                     num_base;
+        n = std::min(base_n, n);
+    }
+    if (n <= 0) { // No threads available
+        if (!check_score) return tablets_compaction;
+        need_pick_tablet = false;
+        n = 0;
+    }
+
+    // Return true for skipping compaction
+    std::function<bool(CloudTablet*)> filter_out;
+    if (compaction_type == CompactionType::BASE_COMPACTION) {
+        filter_out = [&submitted_base_compactions, 
&submitted_full_compactions](CloudTablet* t) {
+            return !!submitted_base_compactions.count(t->tablet_id()) ||
+                   !!submitted_full_compactions.count(t->tablet_id()) ||
+                   t->tablet_state() != TABLET_RUNNING;
+        };
+    } else if (config::enable_parallel_cumu_compaction) {
+        filter_out = [&tablet_preparing_cumu_compaction](CloudTablet* t) {
+            return !!tablet_preparing_cumu_compaction.count(t->tablet_id()) ||
+                   t->tablet_state() != TABLET_RUNNING;
+        };
+    } else {
+        filter_out = [&tablet_preparing_cumu_compaction, 
&submitted_cumu_compactions](CloudTablet* t) {
+            return !!tablet_preparing_cumu_compaction.count(t->tablet_id()) ||
+                   !!submitted_cumu_compactions.count(t->tablet_id()) ||
+                   t->tablet_state() != TABLET_RUNNING;
+        };
+    }
+
+    // Even if need_pick_tablet is false, we still need to call 
find_best_tablet_to_compaction(),
+    // So that we can update the max_compaction_score metric.
+    do {
+        std::vector<CloudTabletSPtr> tablets;
+        auto st = tablet_mgr().get_topn_tablets_to_compact(n, compaction_type, 
filter_out,
+                                                                   &tablets, 
&max_compaction_score);
+        if (!st.ok()) {
+            LOG(WARNING) << "failed to get tablets to compact, err=" << st;
+            break;
+        }
+        if (!need_pick_tablet) break;

Review Comment:
   warning: statement should be inside braces 
[readability-braces-around-statements]
   
   ```suggestion
           if (!need_pick_tablet) { break;
   }
   ```
   



##########
be/src/cloud/cloud_cumulative_compaction_policy.h:
##########
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "common/config.h"
+#include "cloud/cloud_tablet.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_meta.h"
+
+namespace doris {
+
+class Tablet;
+struct Version;
+
+class CloudSizeBasedCumulativeCompactionPolicy {
+public:
+    CloudSizeBasedCumulativeCompactionPolicy(
+            int64_t promotion_size = config::compaction_promotion_size_mbytes 
* 1024 * 1024,
+            double promotion_ratio = config::compaction_promotion_ratio,
+            int64_t promotion_min_size = 
config::compaction_promotion_min_size_mbytes * 1024 * 1024,
+            int64_t compaction_min_size = config::compaction_min_size_mbytes * 
1024 * 1024);
+
+    ~CloudSizeBasedCumulativeCompactionPolicy() {}
+
+    int64_t new_cumulative_point(CloudTablet* tablet, const RowsetSharedPtr& 
output_rowset,
+                                 Version& last_delete_version,
+                                 int64_t last_cumulative_point);
+
+    int pick_input_rowsets(CloudTablet* tablet, const 
std::vector<RowsetSharedPtr>& candidate_rowsets,
+                           const int64_t max_compaction_score, const int64_t 
min_compaction_score,
+                           std::vector<RowsetSharedPtr>* input_rowsets,
+                           Version* last_delete_version, size_t* 
compaction_score,
+                           bool allow_delete = false);
+
+private:
+    int64_t _level_size(const int64_t size);

Review Comment:
   warning: parameter 'size' is const-qualified in the function declaration; 
const-qualification of parameters only has an effect in function definitions 
[readability-avoid-const-params-in-decls]
   
   ```suggestion
       int64_t _level_size(int64_t size);
   ```
   



##########
be/src/cloud/cloud_storage_engine.cpp:
##########
@@ -190,4 +234,418 @@
     }
 }
 
+void CloudStorageEngine::get_cumu_compaction(
+        int64_t tablet_id, 
std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res) {
+    std::lock_guard lock(_compaction_mtx);
+    if (auto it = _submitted_cumu_compactions.find(tablet_id);
+        it != _submitted_cumu_compactions.end()) {
+        res = it->second;
+    }
+}
+
+void CloudStorageEngine::_adjust_compaction_thread_num() {
+    int base_thread_num = get_base_thread_num();
+    if (_base_compaction_thread_pool->max_threads() != base_thread_num) {
+        int old_max_threads = _base_compaction_thread_pool->max_threads();
+        Status status = 
_base_compaction_thread_pool->set_max_threads(base_thread_num);
+        if (status.ok()) {
+            VLOG_NOTICE << "update base compaction thread pool max_threads 
from " << old_max_threads
+                        << " to " << base_thread_num;
+        }
+    }
+    if (_base_compaction_thread_pool->min_threads() != base_thread_num) {
+        int old_min_threads = _base_compaction_thread_pool->min_threads();
+        Status status = 
_base_compaction_thread_pool->set_min_threads(base_thread_num);
+        if (status.ok()) {
+            VLOG_NOTICE << "update base compaction thread pool min_threads 
from " << old_min_threads
+                        << " to " << base_thread_num;
+        }
+    }
+
+    int cumu_thread_num = get_cumu_thread_num();
+    if (_cumu_compaction_thread_pool->max_threads() != cumu_thread_num) {
+        int old_max_threads = _cumu_compaction_thread_pool->max_threads();
+        Status status = 
_cumu_compaction_thread_pool->set_max_threads(cumu_thread_num);
+        if (status.ok()) {
+            VLOG_NOTICE << "update cumu compaction thread pool max_threads 
from " << old_max_threads
+                        << " to " << cumu_thread_num;
+        }
+    }
+    if (_cumu_compaction_thread_pool->min_threads() != cumu_thread_num) {
+        int old_min_threads = _cumu_compaction_thread_pool->min_threads();
+        Status status = 
_cumu_compaction_thread_pool->set_min_threads(cumu_thread_num);
+        if (status.ok()) {
+            VLOG_NOTICE << "update cumu compaction thread pool min_threads 
from " << old_min_threads
+                        << " to " << cumu_thread_num;
+        }
+    }
+}
+
+void CloudStorageEngine::_compaction_tasks_producer_callback() {
+    LOG(INFO) << "try to start compaction producer process!";
+
+    int round = 0;
+    CompactionType compaction_type;
+
+    // Used to record the time when the score metric was last updated.
+    // The update of the score metric is accompanied by the logic of selecting 
the tablet.
+    // If there is no slot available, the logic of selecting the tablet will 
be terminated,
+    // which causes the score metric update to be terminated.
+    // In order to avoid this situation, we need to update the score regularly.
+    int64_t last_cumulative_score_update_time = 0;
+    int64_t last_base_score_update_time = 0;
+    static const int64_t check_score_interval_ms = 5000; // 5 secs
+
+    int64_t interval = config::generate_compaction_tasks_interval_ms;
+    do {
+        if (!config::disable_auto_compaction) {
+            _adjust_compaction_thread_num();
+
+            bool check_score = false;
+            int64_t cur_time = UnixMillis();
+            if (round < 
config::cumulative_compaction_rounds_for_each_base_compaction_round) {
+                compaction_type = CompactionType::CUMULATIVE_COMPACTION;
+                round++;
+                if (cur_time - last_cumulative_score_update_time >= 
check_score_interval_ms) {
+                    check_score = true;
+                    last_cumulative_score_update_time = cur_time;
+                }
+            } else {
+                compaction_type = CompactionType::BASE_COMPACTION;
+                round = 0;
+                if (cur_time - last_base_score_update_time >= 
check_score_interval_ms) {
+                    check_score = true;
+                    last_base_score_update_time = cur_time;
+                }
+            }
+            std::unique_ptr<ThreadPool>& thread_pool =
+                    (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
+                            ? _cumu_compaction_thread_pool
+                            : _base_compaction_thread_pool;
+            VLOG_CRITICAL << "compaction thread pool. type: "
+                          << (compaction_type == 
CompactionType::CUMULATIVE_COMPACTION ? "CUMU"
+                                                                               
        : "BASE")
+                          << ", num_threads: " << thread_pool->num_threads()
+                          << ", num_threads_pending_start: "
+                          << thread_pool->num_threads_pending_start()
+                          << ", num_active_threads: " << 
thread_pool->num_active_threads()
+                          << ", max_threads: " << thread_pool->max_threads()
+                          << ", min_threads: " << thread_pool->min_threads()
+                          << ", num_total_queued_tasks: " << 
thread_pool->get_queue_size();
+            std::vector<CloudTabletSPtr> tablets_compaction =
+                    _generate_cloud_compaction_tasks(compaction_type, 
check_score);
+
+            /// Regardless of whether the tablet is submitted for compaction 
or not,
+            /// we need to call 'reset_compaction' to clean up the 
base_compaction or cumulative_compaction objects
+            /// in the tablet, because these two objects store the tablet's 
own shared_ptr.
+            /// If it is not cleaned up, the reference count of the tablet 
will always be greater than 1,
+            /// thus cannot be collected by the garbage collector. 
(TabletManager::start_trash_sweep)
+            for (const auto& tablet : tablets_compaction) {
+                Status st = submit_compaction_task(tablet, compaction_type);
+                if (st.ok()) continue;
+                if ((!st.is<ErrorCode::BE_NO_SUITABLE_VERSION>() &&
+                     !st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) ||
+                    VLOG_DEBUG_IS_ON) {
+                    LOG(WARNING) << "failed to submit compaction task for 
tablet: "
+                                 << tablet->tablet_id() << ", err: " << st;
+                }
+            }
+            interval = config::generate_compaction_tasks_interval_ms;
+        } else {
+            interval = config::check_auto_compaction_interval_seconds * 1000;
+        }
+    } while 
(!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
+}
+
+std::vector<CloudTabletSPtr> 
CloudStorageEngine::_generate_cloud_compaction_tasks(

Review Comment:
   warning: function '_generate_cloud_compaction_tasks' exceeds recommended 
size/complexity thresholds [readability-function-size]
   ```cpp
   std::vector<CloudTabletSPtr> 
CloudStorageEngine::_generate_cloud_compaction_tasks(
                                                    ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/cloud/cloud_storage_engine.cpp:359:** 85 lines including whitespace 
and comments (threshold 80)
   ```cpp
   std::vector<CloudTabletSPtr> 
CloudStorageEngine::_generate_cloud_compaction_tasks(
                                                    ^
   ```
   
   </details>
   



##########
be/src/olap/compaction.cpp:
##########
@@ -844,4 +847,80 @@ void Compaction::_load_segment_to_cache() {
     }
 }
 
+void CloudCompactionMixin::build_basic_info() {

Review Comment:
   warning: method 'build_basic_info' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/olap/compaction.h:184:
   ```diff
   -     void build_basic_info();
   +     static void build_basic_info();
   ```
   



##########
be/src/olap/compaction.cpp:
##########
@@ -844,4 +847,80 @@
     }
 }
 
+void CloudCompactionMixin::build_basic_info() {
+    _output_version =
+            Version(_input_rowsets.front()->start_version(), 
_input_rowsets.back()->end_version());
+
+    _newest_write_timestamp = _input_rowsets.back()->newest_write_timestamp();
+
+    std::vector<RowsetMetaSharedPtr> rowset_metas(_input_rowsets.size());
+    std::transform(_input_rowsets.begin(), _input_rowsets.end(), 
rowset_metas.begin(),
+                   [](const RowsetSharedPtr& rowset) { return 
rowset->rowset_meta(); });
+    _cur_tablet_schema = 
_tablet->tablet_schema_with_merged_max_schema_version(rowset_metas);
+}
+
+int64_t CloudCompactionMixin::get_compaction_permits() {

Review Comment:
   warning: method 'get_compaction_permits' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/olap/compaction.h:188:
   ```diff
   -     int64_t get_compaction_permits();
   +     static int64_t get_compaction_permits();
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to