github-actions[bot] commented on code in PR #31215:
URL: https://github.com/apache/doris/pull/31215#discussion_r1497168073
##########
be/src/cloud/cloud_base_compaction.cpp:
##########
@@ -0,0 +1,370 @@
+#include "cloud/cloud_base_compaction.h"
+
+#include <boost/container_hash/hash.hpp>
+
+#include "cloud/cloud_meta_mgr.h"
+#include "cloud/config.h"
+#include "common/config.h"
+#include "common/sync_point.h"
+#include "gen_cpp/cloud.pb.h"
+#include "olap/compaction.h"
+#include "olap/task/engine_checksum_task.h"
+#include "service/backend_options.h"
+#include "util/thread.h"
+#include "util/uuid_generator.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+bvar::Adder<uint64_t> base_output_size("base_compaction", "output_size");
+
+CloudBaseCompaction::CloudBaseCompaction(CloudStorageEngine& engine,
CloudTabletSPtr tablet)
+ : CloudCompactionMixin(engine, std::move(tablet),
+ "BaseCompaction:" +
std::to_string(tablet->tablet_id())) {
+ auto uuid = UUIDGenerator::instance()->next_uuid();
+ std::stringstream ss;
+ ss << uuid;
+ _uuid = ss.str();
+}
+
+CloudBaseCompaction::~CloudBaseCompaction() = default;
+
+Status CloudBaseCompaction::prepare_compact() {
+ if (_tablet->tablet_state() != TABLET_RUNNING) {
+ return Status::InternalError("invalid tablet state. tablet_id={}",
_tablet->tablet_id());
+ }
+
+ bool need_sync_tablet = true;
+ {
+ std::shared_lock rlock(_tablet->get_header_lock());
+ // If number of rowsets is equal to approximate_num_rowsets, it is
very likely that this tablet has been
+ // synchronized with meta-service.
+ if (_tablet->tablet_meta()->all_rs_metas().size() >=
+ cloud_tablet()->fetch_add_approximate_num_rowsets(0) &&
+ cloud_tablet()->last_sync_time_s > 0) {
+ need_sync_tablet = false;
+ }
+ }
+ if (need_sync_tablet) {
+ RETURN_IF_ERROR(cloud_tablet()->sync_rowsets());
+ }
+
+ RETURN_IF_ERROR(pick_rowsets_to_compact());
+
+ // prepare compaction job
+ cloud::TabletJobInfoPB job;
+ auto idx = job.mutable_idx();
+ idx->set_tablet_id(_tablet->tablet_id());
+ idx->set_table_id(_tablet->table_id());
+ idx->set_index_id(_tablet->index_id());
+ idx->set_partition_id(_tablet->partition_id());
+ auto compaction_job = job.add_compaction();
+ compaction_job->set_id(_uuid);
+ compaction_job->set_initiator(BackendOptions::get_localhost() + ':' +
+
std::to_string(config::heartbeat_service_port));
+ compaction_job->set_type(cloud::TabletCompactionJobPB::BASE);
+ compaction_job->set_base_compaction_cnt(_base_compaction_cnt);
+ compaction_job->set_cumulative_compaction_cnt(_cumulative_compaction_cnt);
+ using namespace std::chrono;
+ int64_t now =
duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
+ _expiration = now + config::compaction_timeout_seconds;
+ compaction_job->set_expiration(_expiration);
+ compaction_job->set_lease(now + config::lease_compaction_interval_seconds
* 4);
+ cloud::StartTabletJobResponse resp;
+ //auto st = cloud::meta_mgr()->prepare_tablet_job(job, &resp);
+ auto st = _engine.meta_mgr().prepare_tablet_job(job, &resp);
+ if (!st.ok()) {
+ if (resp.status().code() == cloud::STALE_TABLET_CACHE) {
+ // set last_sync_time to 0 to force sync tablet next time
+ cloud_tablet()->last_sync_time_s = 0;
+ } else if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
+ // tablet not found
+ cloud_tablet()->recycle_cached_data();
+ }
+ return st;
+ }
+
+ for (auto& rs : _input_rowsets) {
+ _input_row_num += rs->num_rows();
+ _input_segments += rs->num_segments();
+ _input_rowsets_size += rs->data_disk_size();
+ }
+ LOG_INFO("start CloudBaseCompaction, tablet_id={}, range=[{}-{}]",
_tablet->tablet_id(),
+ _input_rowsets.front()->start_version(),
_input_rowsets.back()->end_version())
+ .tag("job_id", _uuid)
+ .tag("input_rowsets", _input_rowsets.size())
+ .tag("input_rows", _input_row_num)
+ .tag("input_segments", _input_segments)
+ .tag("input_data_size", _input_rowsets_size);
+ return st;
+}
+
+void CloudBaseCompaction::_filter_input_rowset() {
+ // if dup_key and no delete predicate
+ // we skip big files to save resources
+ if (_tablet->keys_type() != KeysType::DUP_KEYS) {
+ return;
+ }
+ for (auto& rs : _input_rowsets) {
+ if (rs->rowset_meta()->has_delete_predicate()) {
+ return;
+ }
+ }
+ int64_t max_size = config::base_compaction_dup_key_max_file_size_mbytes *
1024 * 1024;
+ // first find a proper rowset for start
+ auto rs_iter = _input_rowsets.begin();
+ while (rs_iter != _input_rowsets.end()) {
+ if ((*rs_iter)->rowset_meta()->total_disk_size() >= max_size) {
+ rs_iter = _input_rowsets.erase(rs_iter);
+ } else {
+ break;
+ }
+ }
+}
+
+Status CloudBaseCompaction::pick_rowsets_to_compact() {
+ _input_rowsets.clear();
+ {
+ std::shared_lock rlock(_tablet->get_header_lock());
+ _base_compaction_cnt = cloud_tablet()->base_compaction_cnt();
+ _cumulative_compaction_cnt =
cloud_tablet()->cumulative_compaction_cnt();
+ _input_rowsets =
cloud_tablet()->pick_candidate_rowsets_to_base_compaction();
+ }
+ if (auto st = check_version_continuity(_input_rowsets); !st.ok()) {
+ DCHECK(false) << st;
+ return st;
+ }
+ _filter_input_rowset();
+ if (_input_rowsets.size() <= 1) {
+ return Status::Error<BE_NO_SUITABLE_VERSION>(
+ "insuffient compation input rowset, #rowsets={}",
_input_rowsets.size());
+ }
+
+ if (_input_rowsets.size() == 2 && _input_rowsets[0]->end_version() == 1) {
+ // the tablet is with rowset: [0-1], [2-y]
+ // and [0-1] has no data. in this situation, no need to do base
compaction.
+ return Status::Error<BE_NO_SUITABLE_VERSION>("no suitable versions for
compaction");
+ }
+
+ // 1. cumulative rowset must reach base_compaction_min_rowset_num threshold
+ if (_input_rowsets.size() > config::base_compaction_min_rowset_num) {
+ VLOG_NOTICE << "satisfy the base compaction policy. tablet=" <<
_tablet->tablet_id()
+ << ", num_cumulative_rowsets=" << _input_rowsets.size() - 1
+ << ", base_compaction_num_cumulative_rowsets="
+ << config::base_compaction_min_rowset_num;
+ return Status::OK();
+ }
+
+ // 2. the ratio between base rowset and all input cumulative rowsets
reaches the threshold
+ // `_input_rowsets` has been sorted by end version, so we consider
`_input_rowsets[0]` is the base rowset.
+ int64_t base_size = _input_rowsets.front()->data_disk_size();
+ int64_t cumulative_total_size = 0;
+ for (auto it = _input_rowsets.begin() + 1; it != _input_rowsets.end();
++it) {
+ cumulative_total_size += (*it)->data_disk_size();
+ }
+
+ double base_cumulative_delta_ratio =
config::base_compaction_min_data_ratio;
+ if (base_size == 0) {
+ // base_size == 0 means this may be a base version [0-1], which has no
data.
+ // set to 1 to void divide by zero
+ base_size = 1;
+ }
+ double cumulative_base_ratio = static_cast<double>(cumulative_total_size)
/ base_size;
+
+ if (cumulative_base_ratio > base_cumulative_delta_ratio) {
+ VLOG_NOTICE << "satisfy the base compaction policy. tablet=" <<
_tablet->tablet_id()
+ << ", cumulative_total_size=" << cumulative_total_size
+ << ", base_size=" << base_size
+ << ", cumulative_base_ratio=" << cumulative_base_ratio
+ << ", policy_ratio=" << base_cumulative_delta_ratio;
+ return Status::OK();
+ }
+
+ // 3. the interval since last base compaction reaches the threshold
+ int64_t base_creation_time = _input_rowsets[0]->creation_time();
+ int64_t interval_threshold =
config::base_compaction_interval_seconds_since_last_operation;
+ int64_t interval_since_last_base_compaction = time(nullptr) -
base_creation_time;
+ if (interval_since_last_base_compaction > interval_threshold) {
+ VLOG_NOTICE << "satisfy the base compaction policy. tablet=" <<
_tablet->tablet_id()
+ << ", interval_since_last_base_compaction="
+ << interval_since_last_base_compaction
+ << ", interval_threshold=" << interval_threshold;
+ return Status::OK();
+ }
+
+ VLOG_NOTICE << "don't satisfy the base compaction policy. tablet=" <<
_tablet->tablet_id()
+ << ", num_cumulative_rowsets=" << _input_rowsets.size() - 1
+ << ", cumulative_base_ratio=" << cumulative_base_ratio
+ << ", interval_since_last_base_compaction=" <<
interval_since_last_base_compaction;
+ return Status::Error<BE_NO_SUITABLE_VERSION>("no suitable versions for
compaction");
+}
+
+Status CloudBaseCompaction::execute_compact() {
+ if (config::enable_base_compaction_idle_sched) {
+ Thread::set_idle_sched();
+ }
+
+ SCOPED_ATTACH_TASK(_mem_tracker);
+
+ using namespace std::chrono;
+ auto start = steady_clock::now();
+ RETURN_IF_ERROR(CloudCompactionMixin::execute_compact());
+ LOG_INFO("finish CloudBaseCompaction, tablet_id={}, cost={}ms",
_tablet->tablet_id(),
+ duration_cast<milliseconds>(steady_clock::now() - start).count())
+ .tag("job_id", _uuid)
+ .tag("input_rowsets", _input_rowsets.size())
+ .tag("input_rows", _input_row_num)
+ .tag("input_segments", _input_segments)
+ .tag("input_data_size", _input_rowsets_size)
+ .tag("output_rows", _output_rowset->num_rows())
+ .tag("output_segments", _output_rowset->num_segments())
+ .tag("output_data_size", _output_rowset->data_disk_size());
+
+ //_compaction_succeed = true;
+ _state = CompactionState::SUCCESS;
+
+
DorisMetrics::instance()->base_compaction_deltas_total->increment(_input_rowsets.size());
+
DorisMetrics::instance()->base_compaction_bytes_total->increment(_input_rowsets_size);
+ base_output_size << _output_rowset->data_disk_size();
+
+ return Status::OK();
+}
+
+Status CloudBaseCompaction::modify_rowsets() {
Review Comment:
warning: function 'modify_rowsets' exceeds recommended size/complexity
thresholds [readability-function-size]
```cpp
Status CloudBaseCompaction::modify_rowsets() {
^
```
<details>
<summary>Additional context</summary>
**be/src/cloud/cloud_base_compaction.cpp:232:** 82 lines including
whitespace and comments (threshold 80)
```cpp
Status CloudBaseCompaction::modify_rowsets() {
^
```
</details>
##########
be/src/cloud/cloud_cumulative_compaction.cpp:
##########
@@ -0,0 +1,460 @@
+#include "cloud/cloud_cumulative_compaction.h"
+
+#include "cloud/cloud_meta_mgr.h"
+#include "cloud/config.h"
+#include "common/config.h"
+#include "common/logging.h"
+#include "common/status.h"
+#include "common/sync_point.h"
+#include "gen_cpp/cloud.pb.h"
+#include "olap/compaction.h"
+#include "olap/cumulative_compaction_policy.h"
+#include "service/backend_options.h"
+#include "util/trace.h"
+#include "util/uuid_generator.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+bvar::Adder<uint64_t> cumu_output_size("cumu_compaction", "output_size");
+
+CloudCumulativeCompaction::CloudCumulativeCompaction(CloudStorageEngine&
engine,
+ CloudTabletSPtr tablet)
+ : CloudCompactionMixin(engine, std::move(tablet),
+ "BaseCompaction:" +
std::to_string(tablet->tablet_id())) {
+ auto uuid = UUIDGenerator::instance()->next_uuid();
+ std::stringstream ss;
+ ss << uuid;
+ _uuid = ss.str();
+}
+
+CloudCumulativeCompaction::~CloudCumulativeCompaction() = default;
+
+Status CloudCumulativeCompaction::prepare_compact() {
+ if (_tablet->tablet_state() != TABLET_RUNNING) {
+ return Status::InternalError("invalid tablet state. tablet_id={}",
_tablet->tablet_id());
+ }
+
+ std::vector<std::shared_ptr<CloudCumulativeCompaction>> cumu_compactions;
+ _engine.get_cumu_compaction(_tablet->tablet_id(), cumu_compactions);
+ if (!cumu_compactions.empty()) {
+ for (auto& cumu : cumu_compactions) {
+ _max_conflict_version =
+ std::max(_max_conflict_version,
cumu->_input_rowsets.back()->end_version());
+ }
+ }
+
+ int tried = 0;
+PREPARE_TRY_AGAIN:
+
+ bool need_sync_tablet = true;
+ {
+ std::shared_lock rlock(_tablet->get_header_lock());
+ // If number of rowsets is equal to approximate_num_rowsets, it is
very likely that this tablet has been
+ // synchronized with meta-service.
+ if (_tablet->tablet_meta()->all_rs_metas().size() >=
+ cloud_tablet()->fetch_add_approximate_num_rowsets(0) &&
+ cloud_tablet()->last_sync_time_s > 0) {
+ need_sync_tablet = false;
+ }
+ }
+ if (need_sync_tablet) {
+ RETURN_IF_ERROR(cloud_tablet()->sync_rowsets());
+ }
+
+ // pick rowsets to compact
+ auto st = pick_rowsets_to_compact();
+ if (!st.ok()) {
+ if (tried == 0 && _last_delete_version.first != -1) {
+ // we meet a delete version, should increase the cumulative point
to let base compaction handle the delete version.
+ // plus 1 to skip the delete version.
+ // NOTICE: after that, the cumulative point may be larger than max
version of this tablet, but it doesn't matter.
+ update_cumulative_point();
+ }
+ return st;
+ }
+
+ // prepare compaction job
+ cloud::TabletJobInfoPB job;
+ auto idx = job.mutable_idx();
+ idx->set_tablet_id(_tablet->tablet_id());
+ idx->set_table_id(_tablet->table_id());
+ idx->set_index_id(_tablet->index_id());
+ idx->set_partition_id(_tablet->partition_id());
+ auto compaction_job = job.add_compaction();
+ compaction_job->set_id(_uuid);
+ compaction_job->set_initiator(BackendOptions::get_localhost() + ':' +
+
std::to_string(config::heartbeat_service_port));
+ compaction_job->set_type(cloud::TabletCompactionJobPB::CUMULATIVE);
+ compaction_job->set_base_compaction_cnt(_base_compaction_cnt);
+ compaction_job->set_cumulative_compaction_cnt(_cumulative_compaction_cnt);
+ using namespace std::chrono;
+ int64_t now =
duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
+ _expiration = now + config::compaction_timeout_seconds;
+ compaction_job->set_expiration(_expiration);
+ compaction_job->set_lease(now + config::lease_compaction_interval_seconds
* 4);
+ if (config::enable_parallel_cumu_compaction) {
+ // Set input version range to let meta-service judge version range
conflict
+
compaction_job->add_input_versions(_input_rowsets.front()->start_version());
+
compaction_job->add_input_versions(_input_rowsets.back()->end_version());
+ }
+ cloud::StartTabletJobResponse resp;
+ st = _engine.meta_mgr().prepare_tablet_job(job, &resp);
+ if (!st.ok()) {
+ if (resp.status().code() == cloud::STALE_TABLET_CACHE) {
+ // set last_sync_time to 0 to force sync tablet next time
+ cloud_tablet()->last_sync_time_s = 0;
+ } else if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
+ // tablet not found
+ cloud_tablet()->recycle_cached_data();
+ } else if (resp.status().code() == cloud::JOB_TABLET_BUSY) {
+ if (config::enable_parallel_cumu_compaction &&
resp.version_in_compaction_size() > 0 &&
+ ++tried <= 2) {
+ _max_conflict_version =
*std::max_element(resp.version_in_compaction().begin(),
+
resp.version_in_compaction().end());
+ LOG_INFO("retry pick input rowsets")
+ .tag("job_id", _uuid)
+ .tag("max_conflict_version", _max_conflict_version)
+ .tag("tried", tried)
+ .tag("msg", resp.status().msg());
+ goto PREPARE_TRY_AGAIN;
+ } else {
+ LOG_WARNING("failed to prepare cumu compaction")
+ .tag("job_id", _uuid)
+ .tag("msg", resp.status().msg());
+ return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("no
suitable versions");
+ }
+ }
+ return st;
+ }
+
+ for (auto& rs : _input_rowsets) {
+ _input_row_num += rs->num_rows();
+ _input_segments += rs->num_segments();
+ _input_rowsets_size += rs->data_disk_size();
+ }
+ LOG_INFO("start CloudCumulativeCompaction, tablet_id={}, range=[{}-{}]",
_tablet->tablet_id(),
+ _input_rowsets.front()->start_version(),
_input_rowsets.back()->end_version())
+ .tag("job_id", _uuid)
+ .tag("input_rowsets", _input_rowsets.size())
+ .tag("input_rows", _input_row_num)
+ .tag("input_segments", _input_segments)
+ .tag("input_data_size", _input_rowsets_size)
+ .tag("tablet_max_version", cloud_tablet()->max_version_unlocked())
+ .tag("cumulative_point", cloud_tablet()->cumulative_layer_point())
+ .tag("num_rowsets",
cloud_tablet()->fetch_add_approximate_num_rowsets(0))
+ .tag("cumu_num_rowsets",
cloud_tablet()->fetch_add_approximate_cumu_num_rowsets(0));
+ return st;
+}
+
+Status CloudCumulativeCompaction::execute_compact() {
+
TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudCumulativeCompaction::execute_compact_impl",
+ Status::OK(), this);
+ using namespace std::chrono;
+ auto start = steady_clock::now();
+ RETURN_IF_ERROR(CloudCompactionMixin::execute_compact());
+ LOG_INFO("finish CloudCumulativeCompaction, tablet_id={}, cost={}ms",
_tablet->tablet_id(),
+ duration_cast<milliseconds>(steady_clock::now() - start).count())
+ .tag("job_id", _uuid)
+ .tag("input_rowsets", _input_rowsets.size())
+ .tag("input_rows", _input_row_num)
+ .tag("input_segments", _input_segments)
+ .tag("input_data_size", _input_rowsets_size)
+ .tag("output_rows", _output_rowset->num_rows())
+ .tag("output_segments", _output_rowset->num_segments())
+ .tag("output_data_size", _output_rowset->data_disk_size())
+ .tag("tablet_max_version", _tablet->max_version_unlocked())
+ .tag("cumulative_point", cloud_tablet()->cumulative_layer_point())
+ .tag("num_rowsets",
cloud_tablet()->fetch_add_approximate_num_rowsets(0))
+ .tag("cumu_num_rowsets",
cloud_tablet()->fetch_add_approximate_cumu_num_rowsets(0));
+
+ _state = CompactionState::SUCCESS;
+
+
DorisMetrics::instance()->cumulative_compaction_deltas_total->increment(_input_rowsets.size());
+
DorisMetrics::instance()->cumulative_compaction_bytes_total->increment(_input_rowsets_size);
+ cumu_output_size << _output_rowset->data_disk_size();
+
+ return Status::OK();
+}
+
+Status CloudCumulativeCompaction::modify_rowsets() {
Review Comment:
warning: function 'modify_rowsets' exceeds recommended size/complexity
thresholds [readability-function-size]
```cpp
Status CloudCumulativeCompaction::modify_rowsets() {
^
```
<details>
<summary>Additional context</summary>
**be/src/cloud/cloud_cumulative_compaction.cpp:179:** 102 lines including
whitespace and comments (threshold 80)
```cpp
Status CloudCumulativeCompaction::modify_rowsets() {
^
```
</details>
##########
be/src/cloud/cloud_cumulative_compaction_policy.cpp:
##########
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_cumulative_compaction_policy.h"
+
+#include <algorithm>
+#include <list>
+#include <ostream>
+#include <string>
+
+#include "cloud/config.h"
+#include "common/config.h"
+#include "common/logging.h"
+#include "common/sync_point.h"
+#include "olap/olap_common.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
+
+namespace doris {
+
+CloudSizeBasedCumulativeCompactionPolicy::CloudSizeBasedCumulativeCompactionPolicy(
+ int64_t promotion_size, double promotion_ratio, int64_t
promotion_min_size,
+ int64_t compaction_min_size)
+ : _promotion_size(promotion_size),
+ _promotion_ratio(promotion_ratio),
+ _promotion_min_size(promotion_min_size),
+ _compaction_min_size(compaction_min_size) {}
+
+int64_t CloudSizeBasedCumulativeCompactionPolicy::_level_size(const int64_t
size) {
+ if (size < 1024) return 0;
+ int64_t max_level = (int64_t)1
+ << (sizeof(_promotion_size) * 8 - 1 -
__builtin_clzl(_promotion_size / 2));
+ if (size >= max_level) return max_level;
+ return (int64_t)1 << (sizeof(size) * 8 - 1 - __builtin_clzl(size));
+}
+
+int CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
Review Comment:
warning: function 'pick_input_rowsets' exceeds recommended size/complexity
thresholds [readability-function-size]
```cpp
int CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
^
```
<details>
<summary>Additional context</summary>
**be/src/cloud/cloud_cumulative_compaction_policy.cpp:50:** 127 lines
including whitespace and comments (threshold 80)
```cpp
int CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
^
```
</details>
##########
be/src/cloud/cloud_tablet.h:
##########
@@ -109,6 +109,66 @@ class CloudTablet final : public BaseTablet {
void set_cumulative_compaction_cnt(int64_t cnt) {
_cumulative_compaction_cnt = cnt; }
void set_cumulative_layer_point(int64_t new_point);
+ int64_t last_cumu_compaction_failure_time() { return
_last_cumu_compaction_failure_millis; }
+ void set_last_cumu_compaction_failure_time(int64_t millis) {
+ _last_cumu_compaction_failure_millis = millis;
+ }
+
+ int64_t last_base_compaction_failure_time() { return
_last_base_compaction_failure_millis; }
+ void set_last_base_compaction_failure_time(int64_t millis) {
+ _last_base_compaction_failure_millis = millis;
+ }
+
+ int64_t last_full_compaction_failure_time() { return
_last_full_compaction_failure_millis; }
+ void set_last_full_compaction_failure_time(int64_t millis) {
+ _last_full_compaction_failure_millis = millis;
+ }
+
+ int64_t last_cumu_compaction_success_time() { return
_last_cumu_compaction_success_millis; }
+ void set_last_cumu_compaction_success_time(int64_t millis) {
+ _last_cumu_compaction_success_millis = millis;
+ }
+
+ int64_t last_base_compaction_success_time() { return
_last_base_compaction_success_millis; }
+ void set_last_base_compaction_success_time(int64_t millis) {
+ _last_base_compaction_success_millis = millis;
+ }
+
+ int64_t last_full_compaction_success_time() { return
_last_full_compaction_success_millis; }
+ void set_last_full_compaction_success_time(int64_t millis) {
+ _last_full_compaction_success_millis = millis;
+ }
+
+ int64_t last_base_compaction_schedule_time() { return
_last_base_compaction_schedule_millis; }
+ void set_last_base_compaction_schedule_time(int64_t millis) {
+ _last_base_compaction_schedule_millis = millis;
+ }
+
+ std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_base_compaction();
+
+ void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor,
+ bool include_stale = false) {
+ std::shared_lock rlock(_meta_lock);
+ for (auto& [v, rs] : _rs_version_map) {
+ visitor(rs);
+ }
+ if (!include_stale) return;
+ for (auto& [v, rs] : _stale_rs_version_map) {
+ visitor(rs);
+ }
Review Comment:
warning: method 'traverse_rowsets' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static void traverse_rowsets(std::function<void(const RowsetSharedPtr&)>
visitor,
```
##########
be/src/cloud/cloud_storage_engine.cpp:
##########
@@ -190,4 +235,419 @@
}
}
+void CloudStorageEngine::get_cumu_compaction(
+ int64_t tablet_id,
std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res) {
+ std::lock_guard lock(_compaction_mtx);
+ if (auto it = _submitted_cumu_compactions.find(tablet_id);
+ it != _submitted_cumu_compactions.end()) {
+ res = it->second;
+ }
+}
+
+void CloudStorageEngine::_adjust_compaction_thread_num() {
+ int base_thread_num = get_base_thread_num();
+ if (_base_compaction_thread_pool->max_threads() != base_thread_num) {
+ int old_max_threads = _base_compaction_thread_pool->max_threads();
+ Status status =
_base_compaction_thread_pool->set_max_threads(base_thread_num);
+ if (status.ok()) {
+ VLOG_NOTICE << "update base compaction thread pool max_threads
from " << old_max_threads
+ << " to " << base_thread_num;
+ }
+ }
+ if (_base_compaction_thread_pool->min_threads() != base_thread_num) {
+ int old_min_threads = _base_compaction_thread_pool->min_threads();
+ Status status =
_base_compaction_thread_pool->set_min_threads(base_thread_num);
+ if (status.ok()) {
+ VLOG_NOTICE << "update base compaction thread pool min_threads
from " << old_min_threads
+ << " to " << base_thread_num;
+ }
+ }
+
+ int cumu_thread_num = get_cumu_thread_num();
+ if (_cumu_compaction_thread_pool->max_threads() != cumu_thread_num) {
+ int old_max_threads = _cumu_compaction_thread_pool->max_threads();
+ Status status =
_cumu_compaction_thread_pool->set_max_threads(cumu_thread_num);
+ if (status.ok()) {
+ VLOG_NOTICE << "update cumu compaction thread pool max_threads
from " << old_max_threads
+ << " to " << cumu_thread_num;
+ }
+ }
+ if (_cumu_compaction_thread_pool->min_threads() != cumu_thread_num) {
+ int old_min_threads = _cumu_compaction_thread_pool->min_threads();
+ Status status =
_cumu_compaction_thread_pool->set_min_threads(cumu_thread_num);
+ if (status.ok()) {
+ VLOG_NOTICE << "update cumu compaction thread pool min_threads
from " << old_min_threads
+ << " to " << cumu_thread_num;
+ }
+ }
+}
+
+void CloudStorageEngine::_compaction_tasks_producer_callback() {
+ LOG(INFO) << "try to start compaction producer process!";
+
+ int round = 0;
+ CompactionType compaction_type;
+
+ // Used to record the time when the score metric was last updated.
+ // The update of the score metric is accompanied by the logic of selecting
the tablet.
+ // If there is no slot available, the logic of selecting the tablet will
be terminated,
+ // which causes the score metric update to be terminated.
+ // In order to avoid this situation, we need to update the score regularly.
+ int64_t last_cumulative_score_update_time = 0;
+ int64_t last_base_score_update_time = 0;
+ static const int64_t check_score_interval_ms = 5000; // 5 secs
+
+ int64_t interval = config::generate_compaction_tasks_interval_ms;
+ do {
+ if (!config::disable_auto_compaction) {
+ _adjust_compaction_thread_num();
+
+ bool check_score = false;
+ int64_t cur_time = UnixMillis();
+ if (round <
config::cumulative_compaction_rounds_for_each_base_compaction_round) {
+ compaction_type = CompactionType::CUMULATIVE_COMPACTION;
+ round++;
+ if (cur_time - last_cumulative_score_update_time >=
check_score_interval_ms) {
+ check_score = true;
+ last_cumulative_score_update_time = cur_time;
+ }
+ } else {
+ compaction_type = CompactionType::BASE_COMPACTION;
+ round = 0;
+ if (cur_time - last_base_score_update_time >=
check_score_interval_ms) {
+ check_score = true;
+ last_base_score_update_time = cur_time;
+ }
+ }
+ std::unique_ptr<ThreadPool>& thread_pool =
+ (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
+ ? _cumu_compaction_thread_pool
+ : _base_compaction_thread_pool;
+ VLOG_CRITICAL << "compaction thread pool. type: "
+ << (compaction_type ==
CompactionType::CUMULATIVE_COMPACTION ? "CUMU"
+
: "BASE")
+ << ", num_threads: " << thread_pool->num_threads()
+ << ", num_threads_pending_start: "
+ << thread_pool->num_threads_pending_start()
+ << ", num_active_threads: " <<
thread_pool->num_active_threads()
+ << ", max_threads: " << thread_pool->max_threads()
+ << ", min_threads: " << thread_pool->min_threads()
+ << ", num_total_queued_tasks: " <<
thread_pool->get_queue_size();
+ std::vector<CloudTabletSPtr> tablets_compaction =
+ _generate_cloud_compaction_tasks(compaction_type,
check_score);
+
+ /// Regardless of whether the tablet is submitted for compaction
or not,
+ /// we need to call 'reset_compaction' to clean up the
base_compaction or cumulative_compaction objects
+ /// in the tablet, because these two objects store the tablet's
own shared_ptr.
+ /// If it is not cleaned up, the reference count of the tablet
will always be greater than 1,
+ /// thus cannot be collected by the garbage collector.
(TabletManager::start_trash_sweep)
+ for (const auto& tablet : tablets_compaction) {
+ Status st = submit_compaction_task(tablet, compaction_type);
+ if (st.ok()) continue;
+ if ((!st.is<ErrorCode::BE_NO_SUITABLE_VERSION>() &&
+ !st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) ||
+ VLOG_DEBUG_IS_ON) {
+ LOG(WARNING) << "failed to submit compaction task for
tablet: "
+ << tablet->tablet_id() << ", err: " << st;
+ }
+ }
+ interval = config::generate_compaction_tasks_interval_ms;
+ } else {
+ interval = config::check_auto_compaction_interval_seconds * 1000;
+ }
+ } while
(!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
+}
+
+std::vector<CloudTabletSPtr>
CloudStorageEngine::_generate_cloud_compaction_tasks(
Review Comment:
warning: function '_generate_cloud_compaction_tasks' exceeds recommended
size/complexity thresholds [readability-function-size]
```cpp
std::vector<CloudTabletSPtr>
CloudStorageEngine::_generate_cloud_compaction_tasks(
^
```
<details>
<summary>Additional context</summary>
**be/src/cloud/cloud_storage_engine.cpp:360:** 86 lines including whitespace
and comments (threshold 80)
```cpp
std::vector<CloudTabletSPtr>
CloudStorageEngine::_generate_cloud_compaction_tasks(
^
```
</details>
##########
be/src/cloud/cloud_cumulative_compaction.cpp:
##########
@@ -0,0 +1,460 @@
+#include "cloud/cloud_cumulative_compaction.h"
+
+#include "cloud/cloud_meta_mgr.h"
+#include "cloud/config.h"
+#include "common/config.h"
+#include "common/logging.h"
+#include "common/status.h"
+#include "common/sync_point.h"
+#include "gen_cpp/cloud.pb.h"
+#include "olap/compaction.h"
+#include "olap/cumulative_compaction_policy.h"
+#include "service/backend_options.h"
+#include "util/trace.h"
+#include "util/uuid_generator.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+bvar::Adder<uint64_t> cumu_output_size("cumu_compaction", "output_size");
+
+CloudCumulativeCompaction::CloudCumulativeCompaction(CloudStorageEngine&
engine,
+ CloudTabletSPtr tablet)
+ : CloudCompactionMixin(engine, std::move(tablet),
+ "BaseCompaction:" +
std::to_string(tablet->tablet_id())) {
+ auto uuid = UUIDGenerator::instance()->next_uuid();
+ std::stringstream ss;
+ ss << uuid;
+ _uuid = ss.str();
+}
+
+CloudCumulativeCompaction::~CloudCumulativeCompaction() = default;
+
+Status CloudCumulativeCompaction::prepare_compact() {
Review Comment:
warning: function 'prepare_compact' exceeds recommended size/complexity
thresholds [readability-function-size]
```cpp
Status CloudCumulativeCompaction::prepare_compact() {
^
```
<details>
<summary>Additional context</summary>
**be/src/cloud/cloud_cumulative_compaction.cpp:32:** 115 lines including
whitespace and comments (threshold 80)
```cpp
Status CloudCumulativeCompaction::prepare_compact() {
^
```
</details>
##########
be/src/olap/compaction.cpp:
##########
@@ -844,4 +847,80 @@
}
}
+void CloudCompactionMixin::build_basic_info() {
+ _output_version =
+ Version(_input_rowsets.front()->start_version(),
_input_rowsets.back()->end_version());
+
+ _newest_write_timestamp = _input_rowsets.back()->newest_write_timestamp();
+
+ std::vector<RowsetMetaSharedPtr> rowset_metas(_input_rowsets.size());
+ std::transform(_input_rowsets.begin(), _input_rowsets.end(),
rowset_metas.begin(),
+ [](const RowsetSharedPtr& rowset) { return
rowset->rowset_meta(); });
+ _cur_tablet_schema =
_tablet->tablet_schema_with_merged_max_schema_version(rowset_metas);
+}
+
+int64_t CloudCompactionMixin::get_compaction_permits() {
Review Comment:
warning: method 'get_compaction_permits' can be made static
[readability-convert-member-functions-to-static]
be/src/olap/compaction.h:187:
```diff
- int64_t get_compaction_permits();
+ static int64_t get_compaction_permits();
```
##########
be/src/olap/compaction.cpp:
##########
@@ -844,4 +847,80 @@ void Compaction::_load_segment_to_cache() {
}
}
+void CloudCompactionMixin::build_basic_info() {
Review Comment:
warning: method 'build_basic_info' can be made static
[readability-convert-member-functions-to-static]
be/src/olap/compaction.h:183:
```diff
- void build_basic_info();
+ static void build_basic_info();
```
##########
be/src/cloud/cloud_tablet.h:
##########
@@ -109,6 +109,66 @@
void set_cumulative_compaction_cnt(int64_t cnt) {
_cumulative_compaction_cnt = cnt; }
void set_cumulative_layer_point(int64_t new_point);
+ int64_t last_cumu_compaction_failure_time() { return
_last_cumu_compaction_failure_millis; }
+ void set_last_cumu_compaction_failure_time(int64_t millis) {
+ _last_cumu_compaction_failure_millis = millis;
+ }
+
+ int64_t last_base_compaction_failure_time() { return
_last_base_compaction_failure_millis; }
+ void set_last_base_compaction_failure_time(int64_t millis) {
+ _last_base_compaction_failure_millis = millis;
+ }
+
+ int64_t last_full_compaction_failure_time() { return
_last_full_compaction_failure_millis; }
+ void set_last_full_compaction_failure_time(int64_t millis) {
+ _last_full_compaction_failure_millis = millis;
+ }
+
+ int64_t last_cumu_compaction_success_time() { return
_last_cumu_compaction_success_millis; }
+ void set_last_cumu_compaction_success_time(int64_t millis) {
+ _last_cumu_compaction_success_millis = millis;
+ }
+
+ int64_t last_base_compaction_success_time() { return
_last_base_compaction_success_millis; }
+ void set_last_base_compaction_success_time(int64_t millis) {
+ _last_base_compaction_success_millis = millis;
+ }
+
+ int64_t last_full_compaction_success_time() { return
_last_full_compaction_success_millis; }
+ void set_last_full_compaction_success_time(int64_t millis) {
+ _last_full_compaction_success_millis = millis;
+ }
+
+ int64_t last_base_compaction_schedule_time() { return
_last_base_compaction_schedule_millis; }
+ void set_last_base_compaction_schedule_time(int64_t millis) {
+ _last_base_compaction_schedule_millis = millis;
+ }
+
+ std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_base_compaction();
+
+ void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor,
+ bool include_stale = false) {
+ std::shared_lock rlock(_meta_lock);
+ for (auto& [v, rs] : _rs_version_map) {
+ visitor(rs);
+ }
+ if (!include_stale) return;
+ for (auto& [v, rs] : _stale_rs_version_map) {
+ visitor(rs);
+ }
+ }
+
+ inline Version max_version() const {
+ std::shared_lock rdlock(_meta_lock);
+ return _tablet_meta->max_version();
+ }
+
+ int64_t base_size() const { return _base_size; }
+
+ std::vector<RowsetSharedPtr>
pick_candidate_rowsets_to_single_replica_compaction();
+
+ std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_full_compaction();
Review Comment:
warning: method 'max_version' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static inline Version max_version() {
```
##########
be/src/cloud/cloud_tablet.h:
##########
@@ -109,6 +109,66 @@
void set_cumulative_compaction_cnt(int64_t cnt) {
_cumulative_compaction_cnt = cnt; }
void set_cumulative_layer_point(int64_t new_point);
+ int64_t last_cumu_compaction_failure_time() { return
_last_cumu_compaction_failure_millis; }
+ void set_last_cumu_compaction_failure_time(int64_t millis) {
+ _last_cumu_compaction_failure_millis = millis;
+ }
+
+ int64_t last_base_compaction_failure_time() { return
_last_base_compaction_failure_millis; }
+ void set_last_base_compaction_failure_time(int64_t millis) {
+ _last_base_compaction_failure_millis = millis;
+ }
+
+ int64_t last_full_compaction_failure_time() { return
_last_full_compaction_failure_millis; }
+ void set_last_full_compaction_failure_time(int64_t millis) {
+ _last_full_compaction_failure_millis = millis;
+ }
+
+ int64_t last_cumu_compaction_success_time() { return
_last_cumu_compaction_success_millis; }
+ void set_last_cumu_compaction_success_time(int64_t millis) {
+ _last_cumu_compaction_success_millis = millis;
+ }
+
+ int64_t last_base_compaction_success_time() { return
_last_base_compaction_success_millis; }
+ void set_last_base_compaction_success_time(int64_t millis) {
+ _last_base_compaction_success_millis = millis;
+ }
+
+ int64_t last_full_compaction_success_time() { return
_last_full_compaction_success_millis; }
+ void set_last_full_compaction_success_time(int64_t millis) {
+ _last_full_compaction_success_millis = millis;
+ }
+
+ int64_t last_base_compaction_schedule_time() { return
_last_base_compaction_schedule_millis; }
+ void set_last_base_compaction_schedule_time(int64_t millis) {
+ _last_base_compaction_schedule_millis = millis;
+ }
+
+ std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_base_compaction();
+
+ void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor,
+ bool include_stale = false) {
+ std::shared_lock rlock(_meta_lock);
+ for (auto& [v, rs] : _rs_version_map) {
+ visitor(rs);
+ }
+ if (!include_stale) return;
+ for (auto& [v, rs] : _stale_rs_version_map) {
+ visitor(rs);
+ }
+ }
+
+ inline Version max_version() const {
+ std::shared_lock rdlock(_meta_lock);
+ return _tablet_meta->max_version();
+ }
Review Comment:
warning: statement should be inside braces
[readability-braces-around-statements]
```suggestion
if (!include_stale) { return;
}
```
##########
be/src/cloud/cloud_storage_engine.cpp:
##########
@@ -190,4 +235,419 @@ void CloudStorageEngine::_sync_tablets_thread_callback() {
}
}
+void CloudStorageEngine::get_cumu_compaction(
+ int64_t tablet_id,
std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res) {
+ std::lock_guard lock(_compaction_mtx);
+ if (auto it = _submitted_cumu_compactions.find(tablet_id);
+ it != _submitted_cumu_compactions.end()) {
+ res = it->second;
+ }
+}
+
+void CloudStorageEngine::_adjust_compaction_thread_num() {
Review Comment:
warning: method '_adjust_compaction_thread_num' can be made static
[readability-convert-member-functions-to-static]
be/src/cloud/cloud_storage_engine.h:85:
```diff
- void _adjust_compaction_thread_num();
+ static void _adjust_compaction_thread_num();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]