github-actions[bot] commented on code in PR #31215: URL: https://github.com/apache/doris/pull/31215#discussion_r1497123801
########## be/src/cloud/cloud_cumulative_compaction_policy.cpp: ########## @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_cumulative_compaction_policy.h" + +#include <algorithm> +#include <list> +#include <ostream> +#include <string> + +#include "common/config.h" +#include "common/logging.h" +#include "common/sync_point.h" +#include "cloud/config.h" +#include "olap/olap_common.h" +#include "olap/tablet.h" +#include "olap/tablet_meta.h" + +namespace doris { + +CloudSizeBasedCumulativeCompactionPolicy::CloudSizeBasedCumulativeCompactionPolicy( + int64_t promotion_size, double promotion_ratio, int64_t promotion_min_size, + int64_t compaction_min_size) + : _promotion_size(promotion_size), + _promotion_ratio(promotion_ratio), + _promotion_min_size(promotion_min_size), + _compaction_min_size(compaction_min_size) {} + +int64_t CloudSizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size) { + if (size < 1024) return 0; Review Comment: warning: statement should be inside braces [readability-braces-around-statements] ```suggestion if (size < 1024) { return 0; } ``` ########## be/src/cloud/cloud_cumulative_compaction.cpp: ########## @@ -0,0 +1,457 @@ +#include "cloud/cloud_cumulative_compaction.h" + +#include "cloud/config.h" +#include "cloud/cloud_meta_mgr.h" +#include "common/config.h" +#include "common/logging.h" +#include "common/status.h" +#include "common/sync_point.h" +#include "gen_cpp/cloud.pb.h" +#include "olap/compaction.h" +#include "olap/cumulative_compaction_policy.h" +#include "service/backend_options.h" +#include "util/trace.h" +#include "util/uuid_generator.h" + +namespace doris { +using namespace ErrorCode; + +bvar::Adder<uint64_t> cumu_output_size("cumu_compaction", "output_size"); + +CloudCumulativeCompaction::CloudCumulativeCompaction(CloudStorageEngine& engine, CloudTabletSPtr tablet) + : CloudCompactionMixin(engine, std::move(tablet), "BaseCompaction:" + std::to_string(tablet->tablet_id())) { + auto uuid = UUIDGenerator::instance()->next_uuid(); + std::stringstream ss; + ss << uuid; + _uuid = ss.str(); +} + +CloudCumulativeCompaction::~CloudCumulativeCompaction() = default; + +Status CloudCumulativeCompaction::prepare_compact() { Review Comment: warning: function 'prepare_compact' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp Status CloudCumulativeCompaction::prepare_compact() { ^ ``` <details> <summary>Additional context</summary> **be/src/cloud/cloud_cumulative_compaction.cpp:30:** 115 lines including whitespace and comments (threshold 80) ```cpp Status CloudCumulativeCompaction::prepare_compact() { ^ ``` </details> ########## be/src/cloud/cloud_cumulative_compaction_policy.cpp: ########## @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_cumulative_compaction_policy.h" + +#include <algorithm> +#include <list> +#include <ostream> +#include <string> + +#include "common/config.h" +#include "common/logging.h" +#include "common/sync_point.h" +#include "cloud/config.h" +#include "olap/olap_common.h" +#include "olap/tablet.h" +#include "olap/tablet_meta.h" + +namespace doris { + +CloudSizeBasedCumulativeCompactionPolicy::CloudSizeBasedCumulativeCompactionPolicy( + int64_t promotion_size, double promotion_ratio, int64_t promotion_min_size, + int64_t compaction_min_size) + : _promotion_size(promotion_size), + _promotion_ratio(promotion_ratio), + _promotion_min_size(promotion_min_size), + _compaction_min_size(compaction_min_size) {} + +int64_t CloudSizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size) { + if (size < 1024) return 0; + int64_t max_level = (int64_t)1 + << (sizeof(_promotion_size) * 8 - 1 - __builtin_clzl(_promotion_size / 2)); + if (size >= max_level) return max_level; + return (int64_t)1 << (sizeof(size) * 8 - 1 - __builtin_clzl(size)); +} + +int CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets( Review Comment: warning: function 'pick_input_rowsets' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp int CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets( ^ ``` <details> <summary>Additional context</summary> **be/src/cloud/cloud_cumulative_compaction_policy.cpp:50:** 126 lines including whitespace and comments (threshold 80) ```cpp int CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets( ^ ``` </details> ########## be/src/cloud/cloud_cumulative_compaction_policy.h: ########## @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stddef.h> +#include <stdint.h> + +#include <memory> +#include <string> +#include <vector> + +#include "common/config.h" +#include "cloud/cloud_tablet.h" +#include "olap/rowset/rowset.h" +#include "olap/rowset/rowset_meta.h" + +namespace doris { + +class Tablet; +struct Version; + +class CloudSizeBasedCumulativeCompactionPolicy { +public: + CloudSizeBasedCumulativeCompactionPolicy( + int64_t promotion_size = config::compaction_promotion_size_mbytes * 1024 * 1024, + double promotion_ratio = config::compaction_promotion_ratio, + int64_t promotion_min_size = config::compaction_promotion_min_size_mbytes * 1024 * 1024, + int64_t compaction_min_size = config::compaction_min_size_mbytes * 1024 * 1024); + + ~CloudSizeBasedCumulativeCompactionPolicy() {} + + int64_t new_cumulative_point(CloudTablet* tablet, const RowsetSharedPtr& output_rowset, + Version& last_delete_version, + int64_t last_cumulative_point); + + int pick_input_rowsets(CloudTablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets, + const int64_t max_compaction_score, const int64_t min_compaction_score, + std::vector<RowsetSharedPtr>* input_rowsets, + Version* last_delete_version, size_t* compaction_score, + bool allow_delete = false); + +private: + int64_t _level_size(const int64_t size); + + int64_t cloud_promotion_size(CloudTablet* tablet) const; + +private: Review Comment: warning: redundant access specifier has the same accessibility as the previous access specifier [readability-redundant-access-specifiers] ```suggestion ``` <details> <summary>Additional context</summary> **be/src/cloud/cloud_cumulative_compaction_policy.h:56:** previously declared here ```cpp private: ^ ``` </details> ########## be/src/cloud/cloud_cumulative_compaction.cpp: ########## @@ -0,0 +1,457 @@ +#include "cloud/cloud_cumulative_compaction.h" + +#include "cloud/config.h" +#include "cloud/cloud_meta_mgr.h" +#include "common/config.h" +#include "common/logging.h" +#include "common/status.h" +#include "common/sync_point.h" +#include "gen_cpp/cloud.pb.h" +#include "olap/compaction.h" +#include "olap/cumulative_compaction_policy.h" +#include "service/backend_options.h" +#include "util/trace.h" +#include "util/uuid_generator.h" + +namespace doris { +using namespace ErrorCode; + +bvar::Adder<uint64_t> cumu_output_size("cumu_compaction", "output_size"); + +CloudCumulativeCompaction::CloudCumulativeCompaction(CloudStorageEngine& engine, CloudTabletSPtr tablet) + : CloudCompactionMixin(engine, std::move(tablet), "BaseCompaction:" + std::to_string(tablet->tablet_id())) { + auto uuid = UUIDGenerator::instance()->next_uuid(); + std::stringstream ss; + ss << uuid; + _uuid = ss.str(); +} + +CloudCumulativeCompaction::~CloudCumulativeCompaction() = default; + +Status CloudCumulativeCompaction::prepare_compact() { + if (_tablet->tablet_state() != TABLET_RUNNING) { + return Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id()); + } + + std::vector<std::shared_ptr<CloudCumulativeCompaction>> cumu_compactions; + _engine.get_cumu_compaction(_tablet->tablet_id(), cumu_compactions); + if (!cumu_compactions.empty()) { + for (auto& cumu : cumu_compactions) { + _max_conflict_version = + std::max(_max_conflict_version, cumu->_input_rowsets.back()->end_version()); + } + } + + int tried = 0; +PREPARE_TRY_AGAIN: + + bool need_sync_tablet = true; + { + std::shared_lock rlock(_tablet->get_header_lock()); + // If number of rowsets is equal to approximate_num_rowsets, it is very likely that this tablet has been + // synchronized with meta-service. + if (_tablet->tablet_meta()->all_rs_metas().size() >= + cloud_tablet()->fetch_add_approximate_num_rowsets(0) && + cloud_tablet()->last_sync_time_s > 0) { + need_sync_tablet = false; + } + } + if (need_sync_tablet) { + RETURN_IF_ERROR(cloud_tablet()->sync_rowsets()); + } + + // pick rowsets to compact + auto st = pick_rowsets_to_compact(); + if (!st.ok()) { + if (tried == 0 && _last_delete_version.first != -1) { + // we meet a delete version, should increase the cumulative point to let base compaction handle the delete version. + // plus 1 to skip the delete version. + // NOTICE: after that, the cumulative point may be larger than max version of this tablet, but it doesn't matter. + update_cumulative_point(); + } + return st; + } + + // prepare compaction job + cloud::TabletJobInfoPB job; + auto idx = job.mutable_idx(); + idx->set_tablet_id(_tablet->tablet_id()); + idx->set_table_id(_tablet->table_id()); + idx->set_index_id(_tablet->index_id()); + idx->set_partition_id(_tablet->partition_id()); + auto compaction_job = job.add_compaction(); + compaction_job->set_id(_uuid); + compaction_job->set_initiator(BackendOptions::get_localhost() + ':' + + std::to_string(config::heartbeat_service_port)); + compaction_job->set_type(cloud::TabletCompactionJobPB::CUMULATIVE); + compaction_job->set_base_compaction_cnt(_base_compaction_cnt); + compaction_job->set_cumulative_compaction_cnt(_cumulative_compaction_cnt); + using namespace std::chrono; + int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count(); + _expiration = now + config::compaction_timeout_seconds; + compaction_job->set_expiration(_expiration); + compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4); + if (config::enable_parallel_cumu_compaction) { + // Set input version range to let meta-service judge version range conflict + compaction_job->add_input_versions(_input_rowsets.front()->start_version()); + compaction_job->add_input_versions(_input_rowsets.back()->end_version()); + } + cloud::StartTabletJobResponse resp; + st = _engine.meta_mgr().prepare_tablet_job(job, &resp); + if (!st.ok()) { + if (resp.status().code() == cloud::STALE_TABLET_CACHE) { + // set last_sync_time to 0 to force sync tablet next time + cloud_tablet()->last_sync_time_s = 0; + } else if (resp.status().code() == cloud::TABLET_NOT_FOUND) { + // tablet not found + cloud_tablet()->recycle_cached_data(); + } else if (resp.status().code() == cloud::JOB_TABLET_BUSY) { + if (config::enable_parallel_cumu_compaction && resp.version_in_compaction_size() > 0 && + ++tried <= 2) { + _max_conflict_version = *std::max_element(resp.version_in_compaction().begin(), + resp.version_in_compaction().end()); + LOG_INFO("retry pick input rowsets") + .tag("job_id", _uuid) + .tag("max_conflict_version", _max_conflict_version) + .tag("tried", tried) + .tag("msg", resp.status().msg()); + goto PREPARE_TRY_AGAIN; + } else { + LOG_WARNING("failed to prepare cumu compaction") + .tag("job_id", _uuid) + .tag("msg", resp.status().msg()); + return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("no suitable versions"); + } + } + return st; + } + + for (auto& rs : _input_rowsets) { + _input_row_num += rs->num_rows(); + _input_segments += rs->num_segments(); + _input_rowsets_size += rs->data_disk_size(); + } + LOG_INFO("start CloudCumulativeCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(), + _input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()) + .tag("job_id", _uuid) + .tag("input_rowsets", _input_rowsets.size()) + .tag("input_rows", _input_row_num) + .tag("input_segments", _input_segments) + .tag("input_data_size", _input_rowsets_size) + .tag("tablet_max_version", cloud_tablet()->max_version_unlocked()) + .tag("cumulative_point", cloud_tablet()->cumulative_layer_point()) + .tag("num_rowsets", cloud_tablet()->fetch_add_approximate_num_rowsets(0)) + .tag("cumu_num_rowsets", cloud_tablet()->fetch_add_approximate_cumu_num_rowsets(0)); + return st; +} + +Status CloudCumulativeCompaction::execute_compact() { + TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudCumulativeCompaction::execute_compact_impl", + Status::OK(), this); + using namespace std::chrono; + auto start = steady_clock::now(); + RETURN_IF_ERROR(CloudCompactionMixin::execute_compact()); + LOG_INFO("finish CloudCumulativeCompaction, tablet_id={}, cost={}ms", _tablet->tablet_id(), + duration_cast<milliseconds>(steady_clock::now() - start).count()) + .tag("job_id", _uuid) + .tag("input_rowsets", _input_rowsets.size()) + .tag("input_rows", _input_row_num) + .tag("input_segments", _input_segments) + .tag("input_data_size", _input_rowsets_size) + .tag("output_rows", _output_rowset->num_rows()) + .tag("output_segments", _output_rowset->num_segments()) + .tag("output_data_size", _output_rowset->data_disk_size()) + .tag("tablet_max_version", _tablet->max_version_unlocked()) + .tag("cumulative_point", cloud_tablet()->cumulative_layer_point()) + .tag("num_rowsets", cloud_tablet()->fetch_add_approximate_num_rowsets(0)) + .tag("cumu_num_rowsets", cloud_tablet()->fetch_add_approximate_cumu_num_rowsets(0)); + + _state = CompactionState::SUCCESS; + + DorisMetrics::instance()->cumulative_compaction_deltas_total->increment(_input_rowsets.size()); + DorisMetrics::instance()->cumulative_compaction_bytes_total->increment(_input_rowsets_size); + cumu_output_size << _output_rowset->data_disk_size(); + + return Status::OK(); +} + +Status CloudCumulativeCompaction::modify_rowsets() { Review Comment: warning: function 'modify_rowsets' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp Status CloudCumulativeCompaction::modify_rowsets() { ^ ``` <details> <summary>Additional context</summary> **be/src/cloud/cloud_cumulative_compaction.cpp:177:** 102 lines including whitespace and comments (threshold 80) ```cpp Status CloudCumulativeCompaction::modify_rowsets() { ^ ``` </details> ########## be/src/cloud/cloud_cumulative_compaction_policy.h: ########## @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stddef.h> +#include <stdint.h> + +#include <memory> +#include <string> +#include <vector> + +#include "common/config.h" +#include "cloud/cloud_tablet.h" +#include "olap/rowset/rowset.h" +#include "olap/rowset/rowset_meta.h" + +namespace doris { + +class Tablet; +struct Version; + +class CloudSizeBasedCumulativeCompactionPolicy { +public: + CloudSizeBasedCumulativeCompactionPolicy( + int64_t promotion_size = config::compaction_promotion_size_mbytes * 1024 * 1024, + double promotion_ratio = config::compaction_promotion_ratio, + int64_t promotion_min_size = config::compaction_promotion_min_size_mbytes * 1024 * 1024, + int64_t compaction_min_size = config::compaction_min_size_mbytes * 1024 * 1024); + + ~CloudSizeBasedCumulativeCompactionPolicy() {} + + int64_t new_cumulative_point(CloudTablet* tablet, const RowsetSharedPtr& output_rowset, + Version& last_delete_version, + int64_t last_cumulative_point); + + int pick_input_rowsets(CloudTablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets, + const int64_t max_compaction_score, const int64_t min_compaction_score, Review Comment: warning: parameter 'max_compaction_score' is const-qualified in the function declaration; const-qualification of parameters only has an effect in function definitions [readability-avoid-const-params-in-decls] ```suggestion int64_t max_compaction_score, const int64_t min_compaction_score, ``` ########## be/src/cloud/cloud_tablet.h: ########## @@ -108,6 +108,75 @@ class CloudTablet final : public BaseTablet { void set_base_compaction_cnt(int64_t cnt) { _base_compaction_cnt = cnt; } void set_cumulative_compaction_cnt(int64_t cnt) { _cumulative_compaction_cnt = cnt; } void set_cumulative_layer_point(int64_t new_point); + + /* + int64_t base_compaction_cnt() const { return _base_compaction_cnt; } + void set_base_compaction_cnt(int64_t cnt) { _base_compaction_cnt = cnt; } + int64_t cumulative_compaction_cnt() const { return _cumulative_compaction_cnt; } + void set_cumulative_compaction_cnt(int64_t cnt) { _cumulative_compaction_cnt = cnt; } + int64_t full_compaction_cnt() const { return _full_compaction_cnt; } + void set_full_compaction_cnt(int64_t cnt) { _full_compaction_cnt = cnt; } + */ + + int64_t last_cumu_compaction_failure_time() { return _last_cumu_compaction_failure_millis; } + void set_last_cumu_compaction_failure_time(int64_t millis) { + _last_cumu_compaction_failure_millis = millis; + } + + int64_t last_base_compaction_failure_time() { return _last_base_compaction_failure_millis; } + void set_last_base_compaction_failure_time(int64_t millis) { + _last_base_compaction_failure_millis = millis; + } + + int64_t last_full_compaction_failure_time() { return _last_full_compaction_failure_millis; } + void set_last_full_compaction_failure_time(int64_t millis) { + _last_full_compaction_failure_millis = millis; + } + + int64_t last_cumu_compaction_success_time() { return _last_cumu_compaction_success_millis; } + void set_last_cumu_compaction_success_time(int64_t millis) { + _last_cumu_compaction_success_millis = millis; + } + + int64_t last_base_compaction_success_time() { return _last_base_compaction_success_millis; } + void set_last_base_compaction_success_time(int64_t millis) { + _last_base_compaction_success_millis = millis; + } + + int64_t last_full_compaction_success_time() { return _last_full_compaction_success_millis; } + void set_last_full_compaction_success_time(int64_t millis) { + _last_full_compaction_success_millis = millis; + } + + int64_t last_base_compaction_schedule_time() { return _last_base_compaction_schedule_millis; } + void set_last_base_compaction_schedule_time(int64_t millis) { + _last_base_compaction_schedule_millis = millis; + } + + std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_base_compaction(); + + void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor, Review Comment: warning: method 'traverse_rowsets' can be made static [readability-convert-member-functions-to-static] ```suggestion static void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor, ``` ########## be/src/cloud/cloud_tablet.h: ########## @@ -108,6 +108,75 @@ void set_base_compaction_cnt(int64_t cnt) { _base_compaction_cnt = cnt; } void set_cumulative_compaction_cnt(int64_t cnt) { _cumulative_compaction_cnt = cnt; } void set_cumulative_layer_point(int64_t new_point); + + /* + int64_t base_compaction_cnt() const { return _base_compaction_cnt; } + void set_base_compaction_cnt(int64_t cnt) { _base_compaction_cnt = cnt; } + int64_t cumulative_compaction_cnt() const { return _cumulative_compaction_cnt; } + void set_cumulative_compaction_cnt(int64_t cnt) { _cumulative_compaction_cnt = cnt; } + int64_t full_compaction_cnt() const { return _full_compaction_cnt; } + void set_full_compaction_cnt(int64_t cnt) { _full_compaction_cnt = cnt; } + */ + + int64_t last_cumu_compaction_failure_time() { return _last_cumu_compaction_failure_millis; } + void set_last_cumu_compaction_failure_time(int64_t millis) { + _last_cumu_compaction_failure_millis = millis; + } + + int64_t last_base_compaction_failure_time() { return _last_base_compaction_failure_millis; } + void set_last_base_compaction_failure_time(int64_t millis) { + _last_base_compaction_failure_millis = millis; + } + + int64_t last_full_compaction_failure_time() { return _last_full_compaction_failure_millis; } + void set_last_full_compaction_failure_time(int64_t millis) { + _last_full_compaction_failure_millis = millis; + } + + int64_t last_cumu_compaction_success_time() { return _last_cumu_compaction_success_millis; } + void set_last_cumu_compaction_success_time(int64_t millis) { + _last_cumu_compaction_success_millis = millis; + } + + int64_t last_base_compaction_success_time() { return _last_base_compaction_success_millis; } + void set_last_base_compaction_success_time(int64_t millis) { + _last_base_compaction_success_millis = millis; + } + + int64_t last_full_compaction_success_time() { return _last_full_compaction_success_millis; } + void set_last_full_compaction_success_time(int64_t millis) { + _last_full_compaction_success_millis = millis; + } + + int64_t last_base_compaction_schedule_time() { return _last_base_compaction_schedule_millis; } + void set_last_base_compaction_schedule_time(int64_t millis) { + _last_base_compaction_schedule_millis = millis; + } + + std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_base_compaction(); + + void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor, + bool include_stale = false) { + std::shared_lock rlock(_meta_lock); + for (auto& [v, rs] : _rs_version_map) { + visitor(rs); + } + if (!include_stale) return; Review Comment: warning: statement should be inside braces [readability-braces-around-statements] ```suggestion if (!include_stale) { return; } ``` ########## be/src/cloud/cloud_storage_engine.cpp: ########## @@ -190,4 +234,418 @@ } } +void CloudStorageEngine::get_cumu_compaction( + int64_t tablet_id, std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res) { + std::lock_guard lock(_compaction_mtx); + if (auto it = _submitted_cumu_compactions.find(tablet_id); + it != _submitted_cumu_compactions.end()) { + res = it->second; + } +} + +void CloudStorageEngine::_adjust_compaction_thread_num() { + int base_thread_num = get_base_thread_num(); + if (_base_compaction_thread_pool->max_threads() != base_thread_num) { + int old_max_threads = _base_compaction_thread_pool->max_threads(); + Status status = _base_compaction_thread_pool->set_max_threads(base_thread_num); + if (status.ok()) { + VLOG_NOTICE << "update base compaction thread pool max_threads from " << old_max_threads + << " to " << base_thread_num; + } + } + if (_base_compaction_thread_pool->min_threads() != base_thread_num) { + int old_min_threads = _base_compaction_thread_pool->min_threads(); + Status status = _base_compaction_thread_pool->set_min_threads(base_thread_num); + if (status.ok()) { + VLOG_NOTICE << "update base compaction thread pool min_threads from " << old_min_threads + << " to " << base_thread_num; + } + } + + int cumu_thread_num = get_cumu_thread_num(); + if (_cumu_compaction_thread_pool->max_threads() != cumu_thread_num) { + int old_max_threads = _cumu_compaction_thread_pool->max_threads(); + Status status = _cumu_compaction_thread_pool->set_max_threads(cumu_thread_num); + if (status.ok()) { + VLOG_NOTICE << "update cumu compaction thread pool max_threads from " << old_max_threads + << " to " << cumu_thread_num; + } + } + if (_cumu_compaction_thread_pool->min_threads() != cumu_thread_num) { + int old_min_threads = _cumu_compaction_thread_pool->min_threads(); + Status status = _cumu_compaction_thread_pool->set_min_threads(cumu_thread_num); + if (status.ok()) { + VLOG_NOTICE << "update cumu compaction thread pool min_threads from " << old_min_threads + << " to " << cumu_thread_num; + } + } +} + +void CloudStorageEngine::_compaction_tasks_producer_callback() { + LOG(INFO) << "try to start compaction producer process!"; + + int round = 0; + CompactionType compaction_type; + + // Used to record the time when the score metric was last updated. + // The update of the score metric is accompanied by the logic of selecting the tablet. + // If there is no slot available, the logic of selecting the tablet will be terminated, + // which causes the score metric update to be terminated. + // In order to avoid this situation, we need to update the score regularly. + int64_t last_cumulative_score_update_time = 0; + int64_t last_base_score_update_time = 0; + static const int64_t check_score_interval_ms = 5000; // 5 secs + + int64_t interval = config::generate_compaction_tasks_interval_ms; + do { + if (!config::disable_auto_compaction) { + _adjust_compaction_thread_num(); + + bool check_score = false; + int64_t cur_time = UnixMillis(); + if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) { + compaction_type = CompactionType::CUMULATIVE_COMPACTION; + round++; + if (cur_time - last_cumulative_score_update_time >= check_score_interval_ms) { + check_score = true; + last_cumulative_score_update_time = cur_time; + } + } else { + compaction_type = CompactionType::BASE_COMPACTION; + round = 0; + if (cur_time - last_base_score_update_time >= check_score_interval_ms) { + check_score = true; + last_base_score_update_time = cur_time; + } + } + std::unique_ptr<ThreadPool>& thread_pool = + (compaction_type == CompactionType::CUMULATIVE_COMPACTION) + ? _cumu_compaction_thread_pool + : _base_compaction_thread_pool; + VLOG_CRITICAL << "compaction thread pool. type: " + << (compaction_type == CompactionType::CUMULATIVE_COMPACTION ? "CUMU" + : "BASE") + << ", num_threads: " << thread_pool->num_threads() + << ", num_threads_pending_start: " + << thread_pool->num_threads_pending_start() + << ", num_active_threads: " << thread_pool->num_active_threads() + << ", max_threads: " << thread_pool->max_threads() + << ", min_threads: " << thread_pool->min_threads() + << ", num_total_queued_tasks: " << thread_pool->get_queue_size(); + std::vector<CloudTabletSPtr> tablets_compaction = + _generate_cloud_compaction_tasks(compaction_type, check_score); + + /// Regardless of whether the tablet is submitted for compaction or not, + /// we need to call 'reset_compaction' to clean up the base_compaction or cumulative_compaction objects + /// in the tablet, because these two objects store the tablet's own shared_ptr. + /// If it is not cleaned up, the reference count of the tablet will always be greater than 1, + /// thus cannot be collected by the garbage collector. (TabletManager::start_trash_sweep) + for (const auto& tablet : tablets_compaction) { + Status st = submit_compaction_task(tablet, compaction_type); + if (st.ok()) continue; + if ((!st.is<ErrorCode::BE_NO_SUITABLE_VERSION>() && + !st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) || + VLOG_DEBUG_IS_ON) { + LOG(WARNING) << "failed to submit compaction task for tablet: " + << tablet->tablet_id() << ", err: " << st; + } + } + interval = config::generate_compaction_tasks_interval_ms; + } else { + interval = config::check_auto_compaction_interval_seconds * 1000; + } + } while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval))); +} + +std::vector<CloudTabletSPtr> CloudStorageEngine::_generate_cloud_compaction_tasks( + CompactionType compaction_type, bool check_score) { + std::vector<std::shared_ptr<CloudTablet>> tablets_compaction; + + int64_t max_compaction_score = 0; + std::unordered_set<int64_t> tablet_preparing_cumu_compaction; + std::unordered_map<int64_t, std::vector<std::shared_ptr<CloudCumulativeCompaction>>> + submitted_cumu_compactions; + std::unordered_map<int64_t, std::shared_ptr<CloudBaseCompaction>> submitted_base_compactions; + std::unordered_map<int64_t, std::shared_ptr<CloudFullCompaction>> submitted_full_compactions; + { + std::lock_guard lock(_compaction_mtx); + tablet_preparing_cumu_compaction = _tablet_preparing_cumu_compaction; + submitted_cumu_compactions = _submitted_cumu_compactions; + submitted_base_compactions = _submitted_base_compactions; + submitted_full_compactions = _submitted_full_compactions; + } + + bool need_pick_tablet = true; + int thread_per_disk = + config::compaction_task_num_per_fast_disk; // all disks are fast in cloud mode + int num_cumu = + std::accumulate(submitted_cumu_compactions.begin(), submitted_cumu_compactions.end(), 0, + [](int a, auto& b) { return a + b.second.size(); }); + int num_base = submitted_base_compactions.size() + submitted_full_compactions.size(); + int n = thread_per_disk - num_cumu - num_base; + if (compaction_type == CompactionType::BASE_COMPACTION) { + // We need to reserve at least one thread for cumulative compaction, + // because base compactions may take too long to complete, which may + // leads to "too many rowsets" error. + int base_n = std::min(config::max_base_compaction_task_num_per_disk, thread_per_disk - 1) - + num_base; + n = std::min(base_n, n); + } + if (n <= 0) { // No threads available + if (!check_score) return tablets_compaction; Review Comment: warning: statement should be inside braces [readability-braces-around-statements] ```suggestion if (!check_score) { return tablets_compaction; } ``` ########## be/src/cloud/cloud_tablet.h: ########## @@ -108,6 +108,75 @@ void set_base_compaction_cnt(int64_t cnt) { _base_compaction_cnt = cnt; } void set_cumulative_compaction_cnt(int64_t cnt) { _cumulative_compaction_cnt = cnt; } void set_cumulative_layer_point(int64_t new_point); + + /* + int64_t base_compaction_cnt() const { return _base_compaction_cnt; } + void set_base_compaction_cnt(int64_t cnt) { _base_compaction_cnt = cnt; } + int64_t cumulative_compaction_cnt() const { return _cumulative_compaction_cnt; } + void set_cumulative_compaction_cnt(int64_t cnt) { _cumulative_compaction_cnt = cnt; } + int64_t full_compaction_cnt() const { return _full_compaction_cnt; } + void set_full_compaction_cnt(int64_t cnt) { _full_compaction_cnt = cnt; } + */ + + int64_t last_cumu_compaction_failure_time() { return _last_cumu_compaction_failure_millis; } + void set_last_cumu_compaction_failure_time(int64_t millis) { + _last_cumu_compaction_failure_millis = millis; + } + + int64_t last_base_compaction_failure_time() { return _last_base_compaction_failure_millis; } + void set_last_base_compaction_failure_time(int64_t millis) { + _last_base_compaction_failure_millis = millis; + } + + int64_t last_full_compaction_failure_time() { return _last_full_compaction_failure_millis; } + void set_last_full_compaction_failure_time(int64_t millis) { + _last_full_compaction_failure_millis = millis; + } + + int64_t last_cumu_compaction_success_time() { return _last_cumu_compaction_success_millis; } + void set_last_cumu_compaction_success_time(int64_t millis) { + _last_cumu_compaction_success_millis = millis; + } + + int64_t last_base_compaction_success_time() { return _last_base_compaction_success_millis; } + void set_last_base_compaction_success_time(int64_t millis) { + _last_base_compaction_success_millis = millis; + } + + int64_t last_full_compaction_success_time() { return _last_full_compaction_success_millis; } + void set_last_full_compaction_success_time(int64_t millis) { + _last_full_compaction_success_millis = millis; + } + + int64_t last_base_compaction_schedule_time() { return _last_base_compaction_schedule_millis; } + void set_last_base_compaction_schedule_time(int64_t millis) { + _last_base_compaction_schedule_millis = millis; + } + + std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_base_compaction(); + + void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor, + bool include_stale = false) { + std::shared_lock rlock(_meta_lock); + for (auto& [v, rs] : _rs_version_map) { + visitor(rs); + } + if (!include_stale) return; + for (auto& [v, rs] : _stale_rs_version_map) { + visitor(rs); + } + } + + inline Version max_version() const { Review Comment: warning: method 'max_version' can be made static [readability-convert-member-functions-to-static] ```suggestion static inline Version max_version() { ``` ########## be/src/cloud/cloud_cumulative_compaction_policy.cpp: ########## @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_cumulative_compaction_policy.h" + +#include <algorithm> +#include <list> +#include <ostream> +#include <string> + +#include "common/config.h" +#include "common/logging.h" +#include "common/sync_point.h" +#include "cloud/config.h" +#include "olap/olap_common.h" +#include "olap/tablet.h" +#include "olap/tablet_meta.h" + +namespace doris { + +CloudSizeBasedCumulativeCompactionPolicy::CloudSizeBasedCumulativeCompactionPolicy( + int64_t promotion_size, double promotion_ratio, int64_t promotion_min_size, + int64_t compaction_min_size) + : _promotion_size(promotion_size), + _promotion_ratio(promotion_ratio), + _promotion_min_size(promotion_min_size), + _compaction_min_size(compaction_min_size) {} + +int64_t CloudSizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size) { + if (size < 1024) return 0; + int64_t max_level = (int64_t)1 + << (sizeof(_promotion_size) * 8 - 1 - __builtin_clzl(_promotion_size / 2)); + if (size >= max_level) return max_level; + return (int64_t)1 << (sizeof(size) * 8 - 1 - __builtin_clzl(size)); +} + +int CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets( + CloudTablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets, + const int64_t max_compaction_score, const int64_t min_compaction_score, + std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version, + size_t* compaction_score, bool allow_delete) { + //size_t promotion_size = tablet->cumulative_promotion_size(); + auto max_version = tablet->max_version().first; + int transient_size = 0; + *compaction_score = 0; + int64_t total_size = 0; + for (auto& rowset : candidate_rowsets) { + // check whether this rowset is delete version + if (!allow_delete && rowset->rowset_meta()->has_delete_predicate()) { + *last_delete_version = rowset->version(); + if (!input_rowsets->empty()) { + // we meet a delete version, and there were other versions before. + // we should compact those version before handling them over to base compaction + break; + } else { + // we meet a delete version, and no other versions before, skip it and continue + input_rowsets->clear(); + *compaction_score = 0; + transient_size = 0; + continue; + } + } + if (tablet->tablet_state() == TABLET_NOTREADY) { + // If tablet under alter, keep latest 10 version so that base tablet max version + // not merged in new tablet, and then we can copy data from base tablet + if (rowset->version().second < max_version - 10) { + continue; + } + } + if (*compaction_score >= max_compaction_score) { + // got enough segments + break; + } + *compaction_score += rowset->rowset_meta()->get_compaction_score(); + total_size += rowset->rowset_meta()->total_disk_size(); + + transient_size += 1; + input_rowsets->push_back(rowset); + } + + // if there is delete version, do compaction directly + if (last_delete_version->first != -1) { + if (input_rowsets->size() == 1) { + auto rs_meta = input_rowsets->front()->rowset_meta(); + // if there is only one rowset and not overlapping, + // we do not need to do cumulative compaction + if (!rs_meta->is_segments_overlapping()) { + input_rowsets->clear(); + *compaction_score = 0; + } + } + return transient_size; + } + + auto rs_begin = input_rowsets->begin(); + size_t new_compaction_score = *compaction_score; + while (rs_begin != input_rowsets->end()) { + auto& rs_meta = (*rs_begin)->rowset_meta(); + int current_level = _level_size(rs_meta->total_disk_size()); + int remain_level = _level_size(total_size - rs_meta->total_disk_size()); + // if current level less then remain level, input rowsets contain current rowset + // and process return; otherwise, input rowsets do not contain current rowset. + if (current_level <= remain_level) { + break; + } + total_size -= rs_meta->total_disk_size(); + new_compaction_score -= rs_meta->get_compaction_score(); + ++rs_begin; + } + if (rs_begin == input_rowsets->end()) { // No suitable level size found in `input_rowsets` + if (config::prioritize_query_perf_in_compaction && tablet->keys_type() != DUP_KEYS) { + // While tablet's key type is not `DUP_KEYS`, compacting rowset in such tablets has a significant + // positive impact on queries and reduces space amplification, so we ignore level limitation and + // pick candidate rowsets as input rowsets. + return transient_size; + } else if (*compaction_score >= max_compaction_score) { + // Score of `input_rowsets` exceed max compaction score, which means `input_rowsets` will never change and + // this tablet will never execute cumulative compaction. MUST execute compaction on these `input_rowsets` + // to reduce compaction score. + RowsetSharedPtr rs_with_max_score; + uint32_t max_score = 1; + for (auto& rs : *input_rowsets) { + if (rs->rowset_meta()->get_compaction_score() > max_score) { + max_score = rs->rowset_meta()->get_compaction_score(); + rs_with_max_score = rs; + } + } + if (rs_with_max_score) { + input_rowsets->clear(); + input_rowsets->push_back(std::move(rs_with_max_score)); + *compaction_score = max_score; + return transient_size; + } + // Exceeding max compaction score, do compaction on all candidate rowsets anyway + return transient_size; + } + } + input_rowsets->erase(input_rowsets->begin(), rs_begin); + *compaction_score = new_compaction_score; + + VLOG_CRITICAL << "cumulative compaction size_based policy, compaction_score = " + << *compaction_score << ", total_size = " << total_size + //<< ", calc promotion size value = " << promotion_size + << ", tablet = " << tablet->tablet_id() << ", input_rowset size " + << input_rowsets->size(); + + // empty return + if (input_rowsets->empty()) { + return transient_size; + } + + // if we have a sufficient number of segments, we should process the compaction. + // otherwise, we check number of segments and total_size whether can do compaction. + if (total_size < _compaction_min_size && *compaction_score < min_compaction_score) { + input_rowsets->clear(); + *compaction_score = 0; + } else if (total_size >= _compaction_min_size && input_rowsets->size() == 1) { + auto rs_meta = input_rowsets->front()->rowset_meta(); + // if there is only one rowset and not overlapping, + // we do not need to do compaction + if (!rs_meta->is_segments_overlapping()) { + input_rowsets->clear(); + *compaction_score = 0; + } + } + return transient_size; +} + +int64_t CloudSizeBasedCumulativeCompactionPolicy::cloud_promotion_size(CloudTablet* t) const { + int64_t promotion_size = t->base_size() * _promotion_ratio; + // promotion_size is between _size_based_promotion_size and _size_based_promotion_min_size + return promotion_size > _promotion_size ? _promotion_size + : promotion_size < _promotion_min_size ? _promotion_min_size + : promotion_size; +} + +int64_t CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point( Review Comment: warning: method 'new_cumulative_point' can be made static [readability-convert-member-functions-to-static] be/src/cloud/cloud_cumulative_compaction_policy.h:46: ```diff - int64_t new_cumulative_point(CloudTablet* tablet, const RowsetSharedPtr& output_rowset, + static int64_t new_cumulative_point(CloudTablet* tablet, const RowsetSharedPtr& output_rowset, ``` ########## be/src/cloud/cloud_cumulative_compaction_policy.h: ########## @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stddef.h> +#include <stdint.h> Review Comment: warning: inclusion of deprecated C++ header 'stdint.h'; consider using 'cstdint' instead [modernize-deprecated-headers] ```suggestion #include <cstdint> ``` ########## be/src/cloud/cloud_cumulative_compaction_policy.cpp: ########## @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_cumulative_compaction_policy.h" + +#include <algorithm> +#include <list> +#include <ostream> +#include <string> + +#include "common/config.h" +#include "common/logging.h" +#include "common/sync_point.h" +#include "cloud/config.h" +#include "olap/olap_common.h" +#include "olap/tablet.h" +#include "olap/tablet_meta.h" + +namespace doris { + +CloudSizeBasedCumulativeCompactionPolicy::CloudSizeBasedCumulativeCompactionPolicy( + int64_t promotion_size, double promotion_ratio, int64_t promotion_min_size, + int64_t compaction_min_size) + : _promotion_size(promotion_size), + _promotion_ratio(promotion_ratio), + _promotion_min_size(promotion_min_size), + _compaction_min_size(compaction_min_size) {} + +int64_t CloudSizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size) { + if (size < 1024) return 0; + int64_t max_level = (int64_t)1 + << (sizeof(_promotion_size) * 8 - 1 - __builtin_clzl(_promotion_size / 2)); + if (size >= max_level) return max_level; Review Comment: warning: statement should be inside braces [readability-braces-around-statements] ```suggestion if (size >= max_level) { return max_level; } ``` ########## be/src/cloud/cloud_base_compaction.cpp: ########## @@ -0,0 +1,369 @@ +#include "cloud/cloud_base_compaction.h" + +#include <boost/container_hash/hash.hpp> + +#include "cloud/cloud_meta_mgr.h" +#include "cloud/config.h" +#include "common/config.h" +#include "common/sync_point.h" +#include "gen_cpp/cloud.pb.h" +#include "olap/compaction.h" +#include "olap/task/engine_checksum_task.h" +#include "service/backend_options.h" +#include "util/thread.h" +#include "util/uuid_generator.h" + +namespace doris { +using namespace ErrorCode; + +bvar::Adder<uint64_t> base_output_size("base_compaction", "output_size"); + +CloudBaseCompaction::CloudBaseCompaction(CloudStorageEngine& engine, CloudTabletSPtr tablet) + : CloudCompactionMixin(engine, std::move(tablet), "BaseCompaction:" + std::to_string(tablet->tablet_id())) { + auto uuid = UUIDGenerator::instance()->next_uuid(); + std::stringstream ss; + ss << uuid; + _uuid = ss.str(); +} + +CloudBaseCompaction::~CloudBaseCompaction() = default; + +Status CloudBaseCompaction::prepare_compact() { + if (_tablet->tablet_state() != TABLET_RUNNING) { + return Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id()); + } + + bool need_sync_tablet = true; + { + std::shared_lock rlock(_tablet->get_header_lock()); + // If number of rowsets is equal to approximate_num_rowsets, it is very likely that this tablet has been + // synchronized with meta-service. + if (_tablet->tablet_meta()->all_rs_metas().size() >= + cloud_tablet()->fetch_add_approximate_num_rowsets(0) && + cloud_tablet()->last_sync_time_s > 0) { + need_sync_tablet = false; + } + } + if (need_sync_tablet) { + RETURN_IF_ERROR(cloud_tablet()->sync_rowsets()); + } + + RETURN_IF_ERROR(pick_rowsets_to_compact()); + + // prepare compaction job + cloud::TabletJobInfoPB job; + auto idx = job.mutable_idx(); + idx->set_tablet_id(_tablet->tablet_id()); + idx->set_table_id(_tablet->table_id()); + idx->set_index_id(_tablet->index_id()); + idx->set_partition_id(_tablet->partition_id()); + auto compaction_job = job.add_compaction(); + compaction_job->set_id(_uuid); + compaction_job->set_initiator(BackendOptions::get_localhost() + ':' + + std::to_string(config::heartbeat_service_port)); + compaction_job->set_type(cloud::TabletCompactionJobPB::BASE); + compaction_job->set_base_compaction_cnt(_base_compaction_cnt); + compaction_job->set_cumulative_compaction_cnt(_cumulative_compaction_cnt); + using namespace std::chrono; + int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count(); + _expiration = now + config::compaction_timeout_seconds; + compaction_job->set_expiration(_expiration); + compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4); + cloud::StartTabletJobResponse resp; + //auto st = cloud::meta_mgr()->prepare_tablet_job(job, &resp); + auto st = _engine.meta_mgr().prepare_tablet_job(job, &resp); + if (!st.ok()) { + if (resp.status().code() == cloud::STALE_TABLET_CACHE) { + // set last_sync_time to 0 to force sync tablet next time + cloud_tablet()->last_sync_time_s = 0; + } else if (resp.status().code() == cloud::TABLET_NOT_FOUND) { + // tablet not found + cloud_tablet()->recycle_cached_data(); + } + return st; + } + + for (auto& rs : _input_rowsets) { + _input_row_num += rs->num_rows(); + _input_segments += rs->num_segments(); + _input_rowsets_size += rs->data_disk_size(); + } + LOG_INFO("start CloudBaseCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(), + _input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()) + .tag("job_id", _uuid) + .tag("input_rowsets", _input_rowsets.size()) + .tag("input_rows", _input_row_num) + .tag("input_segments", _input_segments) + .tag("input_data_size", _input_rowsets_size); + return st; +} + +void CloudBaseCompaction::_filter_input_rowset() { + // if dup_key and no delete predicate + // we skip big files to save resources + if (_tablet->keys_type() != KeysType::DUP_KEYS) { + return; + } + for (auto& rs : _input_rowsets) { + if (rs->rowset_meta()->has_delete_predicate()) { + return; + } + } + int64_t max_size = config::base_compaction_dup_key_max_file_size_mbytes * 1024 * 1024; + // first find a proper rowset for start + auto rs_iter = _input_rowsets.begin(); + while (rs_iter != _input_rowsets.end()) { + if ((*rs_iter)->rowset_meta()->total_disk_size() >= max_size) { + rs_iter = _input_rowsets.erase(rs_iter); + } else { + break; + } + } +} + +Status CloudBaseCompaction::pick_rowsets_to_compact() { + _input_rowsets.clear(); + { + std::shared_lock rlock(_tablet->get_header_lock()); + _base_compaction_cnt = cloud_tablet()->base_compaction_cnt(); + _cumulative_compaction_cnt = cloud_tablet()->cumulative_compaction_cnt(); + _input_rowsets = cloud_tablet()->pick_candidate_rowsets_to_base_compaction(); + } + if (auto st = check_version_continuity(_input_rowsets); !st.ok()) { + DCHECK(false) << st; + return st; + } + _filter_input_rowset(); + if (_input_rowsets.size() <= 1) { + return Status::Error<BE_NO_SUITABLE_VERSION>( + "insuffient compation input rowset, #rowsets={}", _input_rowsets.size()); + } + + if (_input_rowsets.size() == 2 && _input_rowsets[0]->end_version() == 1) { + // the tablet is with rowset: [0-1], [2-y] + // and [0-1] has no data. in this situation, no need to do base compaction. + return Status::Error<BE_NO_SUITABLE_VERSION>("no suitable versions for compaction"); + } + + // 1. cumulative rowset must reach base_compaction_min_rowset_num threshold + if (_input_rowsets.size() > config::base_compaction_min_rowset_num) { + VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << _tablet->tablet_id() + << ", num_cumulative_rowsets=" << _input_rowsets.size() - 1 + << ", base_compaction_num_cumulative_rowsets=" + << config::base_compaction_min_rowset_num; + return Status::OK(); + } + + // 2. the ratio between base rowset and all input cumulative rowsets reaches the threshold + // `_input_rowsets` has been sorted by end version, so we consider `_input_rowsets[0]` is the base rowset. + int64_t base_size = _input_rowsets.front()->data_disk_size(); + int64_t cumulative_total_size = 0; + for (auto it = _input_rowsets.begin() + 1; it != _input_rowsets.end(); ++it) { + cumulative_total_size += (*it)->data_disk_size(); + } + + double base_cumulative_delta_ratio = config::base_compaction_min_data_ratio; + if (base_size == 0) { + // base_size == 0 means this may be a base version [0-1], which has no data. + // set to 1 to void divide by zero + base_size = 1; + } + double cumulative_base_ratio = static_cast<double>(cumulative_total_size) / base_size; + + if (cumulative_base_ratio > base_cumulative_delta_ratio) { + VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << _tablet->tablet_id() + << ", cumulative_total_size=" << cumulative_total_size + << ", base_size=" << base_size + << ", cumulative_base_ratio=" << cumulative_base_ratio + << ", policy_ratio=" << base_cumulative_delta_ratio; + return Status::OK(); + } + + // 3. the interval since last base compaction reaches the threshold + int64_t base_creation_time = _input_rowsets[0]->creation_time(); + int64_t interval_threshold = config::base_compaction_interval_seconds_since_last_operation; + int64_t interval_since_last_base_compaction = time(nullptr) - base_creation_time; + if (interval_since_last_base_compaction > interval_threshold) { + VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << _tablet->tablet_id() + << ", interval_since_last_base_compaction=" + << interval_since_last_base_compaction + << ", interval_threshold=" << interval_threshold; + return Status::OK(); + } + + VLOG_NOTICE << "don't satisfy the base compaction policy. tablet=" << _tablet->tablet_id() + << ", num_cumulative_rowsets=" << _input_rowsets.size() - 1 + << ", cumulative_base_ratio=" << cumulative_base_ratio + << ", interval_since_last_base_compaction=" << interval_since_last_base_compaction; + return Status::Error<BE_NO_SUITABLE_VERSION>("no suitable versions for compaction"); +} + +Status CloudBaseCompaction::execute_compact() { + if (config::enable_base_compaction_idle_sched) { + Thread::set_idle_sched(); + } + + SCOPED_ATTACH_TASK(_mem_tracker); + + using namespace std::chrono; + auto start = steady_clock::now(); + RETURN_IF_ERROR(CloudCompactionMixin::execute_compact()); + LOG_INFO("finish CloudBaseCompaction, tablet_id={}, cost={}ms", _tablet->tablet_id(), + duration_cast<milliseconds>(steady_clock::now() - start).count()) + .tag("job_id", _uuid) + .tag("input_rowsets", _input_rowsets.size()) + .tag("input_rows", _input_row_num) + .tag("input_segments", _input_segments) + .tag("input_data_size", _input_rowsets_size) + .tag("output_rows", _output_rowset->num_rows()) + .tag("output_segments", _output_rowset->num_segments()) + .tag("output_data_size", _output_rowset->data_disk_size()); + + //_compaction_succeed = true; + _state = CompactionState::SUCCESS; + + DorisMetrics::instance()->base_compaction_deltas_total->increment(_input_rowsets.size()); + DorisMetrics::instance()->base_compaction_bytes_total->increment(_input_rowsets_size); + base_output_size << _output_rowset->data_disk_size(); + + return Status::OK(); +} + +Status CloudBaseCompaction::modify_rowsets() { Review Comment: warning: function 'modify_rowsets' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp Status CloudBaseCompaction::modify_rowsets() { ^ ``` <details> <summary>Additional context</summary> **be/src/cloud/cloud_base_compaction.cpp:231:** 82 lines including whitespace and comments (threshold 80) ```cpp Status CloudBaseCompaction::modify_rowsets() { ^ ``` </details> ########## be/src/cloud/cloud_base_compaction.cpp: ########## @@ -0,0 +1,369 @@ +#include "cloud/cloud_base_compaction.h" + +#include <boost/container_hash/hash.hpp> + +#include "cloud/cloud_meta_mgr.h" +#include "cloud/config.h" +#include "common/config.h" +#include "common/sync_point.h" +#include "gen_cpp/cloud.pb.h" +#include "olap/compaction.h" +#include "olap/task/engine_checksum_task.h" +#include "service/backend_options.h" +#include "util/thread.h" +#include "util/uuid_generator.h" + +namespace doris { +using namespace ErrorCode; + +bvar::Adder<uint64_t> base_output_size("base_compaction", "output_size"); + +CloudBaseCompaction::CloudBaseCompaction(CloudStorageEngine& engine, CloudTabletSPtr tablet) + : CloudCompactionMixin(engine, std::move(tablet), "BaseCompaction:" + std::to_string(tablet->tablet_id())) { + auto uuid = UUIDGenerator::instance()->next_uuid(); + std::stringstream ss; + ss << uuid; + _uuid = ss.str(); +} + +CloudBaseCompaction::~CloudBaseCompaction() = default; + +Status CloudBaseCompaction::prepare_compact() { + if (_tablet->tablet_state() != TABLET_RUNNING) { + return Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id()); + } + + bool need_sync_tablet = true; + { + std::shared_lock rlock(_tablet->get_header_lock()); + // If number of rowsets is equal to approximate_num_rowsets, it is very likely that this tablet has been + // synchronized with meta-service. + if (_tablet->tablet_meta()->all_rs_metas().size() >= + cloud_tablet()->fetch_add_approximate_num_rowsets(0) && + cloud_tablet()->last_sync_time_s > 0) { + need_sync_tablet = false; + } + } + if (need_sync_tablet) { + RETURN_IF_ERROR(cloud_tablet()->sync_rowsets()); + } + + RETURN_IF_ERROR(pick_rowsets_to_compact()); + + // prepare compaction job + cloud::TabletJobInfoPB job; + auto idx = job.mutable_idx(); + idx->set_tablet_id(_tablet->tablet_id()); + idx->set_table_id(_tablet->table_id()); + idx->set_index_id(_tablet->index_id()); + idx->set_partition_id(_tablet->partition_id()); + auto compaction_job = job.add_compaction(); + compaction_job->set_id(_uuid); + compaction_job->set_initiator(BackendOptions::get_localhost() + ':' + + std::to_string(config::heartbeat_service_port)); + compaction_job->set_type(cloud::TabletCompactionJobPB::BASE); + compaction_job->set_base_compaction_cnt(_base_compaction_cnt); + compaction_job->set_cumulative_compaction_cnt(_cumulative_compaction_cnt); + using namespace std::chrono; + int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count(); + _expiration = now + config::compaction_timeout_seconds; + compaction_job->set_expiration(_expiration); + compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4); + cloud::StartTabletJobResponse resp; + //auto st = cloud::meta_mgr()->prepare_tablet_job(job, &resp); + auto st = _engine.meta_mgr().prepare_tablet_job(job, &resp); + if (!st.ok()) { + if (resp.status().code() == cloud::STALE_TABLET_CACHE) { + // set last_sync_time to 0 to force sync tablet next time + cloud_tablet()->last_sync_time_s = 0; + } else if (resp.status().code() == cloud::TABLET_NOT_FOUND) { + // tablet not found + cloud_tablet()->recycle_cached_data(); + } + return st; + } + + for (auto& rs : _input_rowsets) { + _input_row_num += rs->num_rows(); + _input_segments += rs->num_segments(); + _input_rowsets_size += rs->data_disk_size(); + } + LOG_INFO("start CloudBaseCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(), + _input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()) + .tag("job_id", _uuid) + .tag("input_rowsets", _input_rowsets.size()) + .tag("input_rows", _input_row_num) + .tag("input_segments", _input_segments) + .tag("input_data_size", _input_rowsets_size); + return st; +} + +void CloudBaseCompaction::_filter_input_rowset() { Review Comment: warning: method '_filter_input_rowset' can be made static [readability-convert-member-functions-to-static] be/src/cloud/cloud_base_compaction.h:29: ```diff - void _filter_input_rowset(); + static void _filter_input_rowset(); ``` ########## be/src/cloud/cloud_cumulative_compaction_policy.h: ########## @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stddef.h> +#include <stdint.h> + +#include <memory> +#include <string> +#include <vector> + +#include "common/config.h" +#include "cloud/cloud_tablet.h" +#include "olap/rowset/rowset.h" +#include "olap/rowset/rowset_meta.h" + +namespace doris { + +class Tablet; +struct Version; + +class CloudSizeBasedCumulativeCompactionPolicy { +public: + CloudSizeBasedCumulativeCompactionPolicy( + int64_t promotion_size = config::compaction_promotion_size_mbytes * 1024 * 1024, + double promotion_ratio = config::compaction_promotion_ratio, + int64_t promotion_min_size = config::compaction_promotion_min_size_mbytes * 1024 * 1024, + int64_t compaction_min_size = config::compaction_min_size_mbytes * 1024 * 1024); + + ~CloudSizeBasedCumulativeCompactionPolicy() {} + + int64_t new_cumulative_point(CloudTablet* tablet, const RowsetSharedPtr& output_rowset, + Version& last_delete_version, + int64_t last_cumulative_point); + + int pick_input_rowsets(CloudTablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets, + const int64_t max_compaction_score, const int64_t min_compaction_score, Review Comment: warning: parameter 'min_compaction_score' is const-qualified in the function declaration; const-qualification of parameters only has an effect in function definitions [readability-avoid-const-params-in-decls] ```suggestion const int64_t max_compaction_score, int64_t min_compaction_score, ``` ########## be/src/cloud/cloud_storage_engine.cpp: ########## @@ -190,4 +234,418 @@ } } +void CloudStorageEngine::get_cumu_compaction( + int64_t tablet_id, std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res) { + std::lock_guard lock(_compaction_mtx); + if (auto it = _submitted_cumu_compactions.find(tablet_id); + it != _submitted_cumu_compactions.end()) { + res = it->second; + } +} + +void CloudStorageEngine::_adjust_compaction_thread_num() { Review Comment: warning: method '_adjust_compaction_thread_num' can be made static [readability-convert-member-functions-to-static] be/src/cloud/cloud_storage_engine.h:83: ```diff - void _adjust_compaction_thread_num(); + static void _adjust_compaction_thread_num(); ``` ########## be/src/cloud/cloud_cumulative_compaction_policy.h: ########## @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stddef.h> Review Comment: warning: inclusion of deprecated C++ header 'stddef.h'; consider using 'cstddef' instead [modernize-deprecated-headers] ```suggestion #include <cstddef> ``` ########## be/src/cloud/cloud_cumulative_compaction_policy.h: ########## @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stddef.h> +#include <stdint.h> + +#include <memory> +#include <string> +#include <vector> + +#include "common/config.h" +#include "cloud/cloud_tablet.h" +#include "olap/rowset/rowset.h" +#include "olap/rowset/rowset_meta.h" + +namespace doris { + +class Tablet; +struct Version; + +class CloudSizeBasedCumulativeCompactionPolicy { +public: + CloudSizeBasedCumulativeCompactionPolicy( + int64_t promotion_size = config::compaction_promotion_size_mbytes * 1024 * 1024, + double promotion_ratio = config::compaction_promotion_ratio, + int64_t promotion_min_size = config::compaction_promotion_min_size_mbytes * 1024 * 1024, + int64_t compaction_min_size = config::compaction_min_size_mbytes * 1024 * 1024); + + ~CloudSizeBasedCumulativeCompactionPolicy() {} Review Comment: warning: use '= default' to define a trivial destructor [modernize-use-equals-default] ```suggestion ~CloudSizeBasedCumulativeCompactionPolicy() = default; ``` ########## be/src/cloud/cloud_storage_engine.cpp: ########## @@ -190,4 +234,418 @@ void CloudStorageEngine::_sync_tablets_thread_callback() { } } +void CloudStorageEngine::get_cumu_compaction( Review Comment: warning: method 'get_cumu_compaction' can be made static [readability-convert-member-functions-to-static] ```suggestion static void CloudStorageEngine::get_cumu_compaction( ``` ########## be/src/cloud/cloud_storage_engine.cpp: ########## @@ -190,4 +234,418 @@ } } +void CloudStorageEngine::get_cumu_compaction( + int64_t tablet_id, std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res) { + std::lock_guard lock(_compaction_mtx); + if (auto it = _submitted_cumu_compactions.find(tablet_id); + it != _submitted_cumu_compactions.end()) { + res = it->second; + } +} + +void CloudStorageEngine::_adjust_compaction_thread_num() { + int base_thread_num = get_base_thread_num(); + if (_base_compaction_thread_pool->max_threads() != base_thread_num) { + int old_max_threads = _base_compaction_thread_pool->max_threads(); + Status status = _base_compaction_thread_pool->set_max_threads(base_thread_num); + if (status.ok()) { + VLOG_NOTICE << "update base compaction thread pool max_threads from " << old_max_threads + << " to " << base_thread_num; + } + } + if (_base_compaction_thread_pool->min_threads() != base_thread_num) { + int old_min_threads = _base_compaction_thread_pool->min_threads(); + Status status = _base_compaction_thread_pool->set_min_threads(base_thread_num); + if (status.ok()) { + VLOG_NOTICE << "update base compaction thread pool min_threads from " << old_min_threads + << " to " << base_thread_num; + } + } + + int cumu_thread_num = get_cumu_thread_num(); + if (_cumu_compaction_thread_pool->max_threads() != cumu_thread_num) { + int old_max_threads = _cumu_compaction_thread_pool->max_threads(); + Status status = _cumu_compaction_thread_pool->set_max_threads(cumu_thread_num); + if (status.ok()) { + VLOG_NOTICE << "update cumu compaction thread pool max_threads from " << old_max_threads + << " to " << cumu_thread_num; + } + } + if (_cumu_compaction_thread_pool->min_threads() != cumu_thread_num) { + int old_min_threads = _cumu_compaction_thread_pool->min_threads(); + Status status = _cumu_compaction_thread_pool->set_min_threads(cumu_thread_num); + if (status.ok()) { + VLOG_NOTICE << "update cumu compaction thread pool min_threads from " << old_min_threads + << " to " << cumu_thread_num; + } + } +} + +void CloudStorageEngine::_compaction_tasks_producer_callback() { + LOG(INFO) << "try to start compaction producer process!"; + + int round = 0; + CompactionType compaction_type; + + // Used to record the time when the score metric was last updated. + // The update of the score metric is accompanied by the logic of selecting the tablet. + // If there is no slot available, the logic of selecting the tablet will be terminated, + // which causes the score metric update to be terminated. + // In order to avoid this situation, we need to update the score regularly. + int64_t last_cumulative_score_update_time = 0; + int64_t last_base_score_update_time = 0; + static const int64_t check_score_interval_ms = 5000; // 5 secs + + int64_t interval = config::generate_compaction_tasks_interval_ms; + do { + if (!config::disable_auto_compaction) { + _adjust_compaction_thread_num(); + + bool check_score = false; + int64_t cur_time = UnixMillis(); + if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) { + compaction_type = CompactionType::CUMULATIVE_COMPACTION; + round++; + if (cur_time - last_cumulative_score_update_time >= check_score_interval_ms) { + check_score = true; + last_cumulative_score_update_time = cur_time; + } + } else { + compaction_type = CompactionType::BASE_COMPACTION; + round = 0; + if (cur_time - last_base_score_update_time >= check_score_interval_ms) { + check_score = true; + last_base_score_update_time = cur_time; + } + } + std::unique_ptr<ThreadPool>& thread_pool = + (compaction_type == CompactionType::CUMULATIVE_COMPACTION) + ? _cumu_compaction_thread_pool + : _base_compaction_thread_pool; + VLOG_CRITICAL << "compaction thread pool. type: " + << (compaction_type == CompactionType::CUMULATIVE_COMPACTION ? "CUMU" + : "BASE") + << ", num_threads: " << thread_pool->num_threads() + << ", num_threads_pending_start: " + << thread_pool->num_threads_pending_start() + << ", num_active_threads: " << thread_pool->num_active_threads() + << ", max_threads: " << thread_pool->max_threads() + << ", min_threads: " << thread_pool->min_threads() + << ", num_total_queued_tasks: " << thread_pool->get_queue_size(); + std::vector<CloudTabletSPtr> tablets_compaction = + _generate_cloud_compaction_tasks(compaction_type, check_score); + + /// Regardless of whether the tablet is submitted for compaction or not, + /// we need to call 'reset_compaction' to clean up the base_compaction or cumulative_compaction objects + /// in the tablet, because these two objects store the tablet's own shared_ptr. + /// If it is not cleaned up, the reference count of the tablet will always be greater than 1, + /// thus cannot be collected by the garbage collector. (TabletManager::start_trash_sweep) + for (const auto& tablet : tablets_compaction) { + Status st = submit_compaction_task(tablet, compaction_type); + if (st.ok()) continue; + if ((!st.is<ErrorCode::BE_NO_SUITABLE_VERSION>() && + !st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) || + VLOG_DEBUG_IS_ON) { + LOG(WARNING) << "failed to submit compaction task for tablet: " + << tablet->tablet_id() << ", err: " << st; + } + } + interval = config::generate_compaction_tasks_interval_ms; + } else { + interval = config::check_auto_compaction_interval_seconds * 1000; + } + } while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval))); +} + +std::vector<CloudTabletSPtr> CloudStorageEngine::_generate_cloud_compaction_tasks( + CompactionType compaction_type, bool check_score) { + std::vector<std::shared_ptr<CloudTablet>> tablets_compaction; + + int64_t max_compaction_score = 0; + std::unordered_set<int64_t> tablet_preparing_cumu_compaction; + std::unordered_map<int64_t, std::vector<std::shared_ptr<CloudCumulativeCompaction>>> + submitted_cumu_compactions; + std::unordered_map<int64_t, std::shared_ptr<CloudBaseCompaction>> submitted_base_compactions; + std::unordered_map<int64_t, std::shared_ptr<CloudFullCompaction>> submitted_full_compactions; + { + std::lock_guard lock(_compaction_mtx); + tablet_preparing_cumu_compaction = _tablet_preparing_cumu_compaction; + submitted_cumu_compactions = _submitted_cumu_compactions; + submitted_base_compactions = _submitted_base_compactions; + submitted_full_compactions = _submitted_full_compactions; + } + + bool need_pick_tablet = true; + int thread_per_disk = + config::compaction_task_num_per_fast_disk; // all disks are fast in cloud mode + int num_cumu = + std::accumulate(submitted_cumu_compactions.begin(), submitted_cumu_compactions.end(), 0, + [](int a, auto& b) { return a + b.second.size(); }); + int num_base = submitted_base_compactions.size() + submitted_full_compactions.size(); + int n = thread_per_disk - num_cumu - num_base; + if (compaction_type == CompactionType::BASE_COMPACTION) { + // We need to reserve at least one thread for cumulative compaction, + // because base compactions may take too long to complete, which may + // leads to "too many rowsets" error. + int base_n = std::min(config::max_base_compaction_task_num_per_disk, thread_per_disk - 1) - + num_base; + n = std::min(base_n, n); + } + if (n <= 0) { // No threads available + if (!check_score) return tablets_compaction; + need_pick_tablet = false; + n = 0; + } + + // Return true for skipping compaction + std::function<bool(CloudTablet*)> filter_out; + if (compaction_type == CompactionType::BASE_COMPACTION) { + filter_out = [&submitted_base_compactions, &submitted_full_compactions](CloudTablet* t) { + return !!submitted_base_compactions.count(t->tablet_id()) || + !!submitted_full_compactions.count(t->tablet_id()) || + t->tablet_state() != TABLET_RUNNING; + }; + } else if (config::enable_parallel_cumu_compaction) { + filter_out = [&tablet_preparing_cumu_compaction](CloudTablet* t) { + return !!tablet_preparing_cumu_compaction.count(t->tablet_id()) || + t->tablet_state() != TABLET_RUNNING; + }; + } else { + filter_out = [&tablet_preparing_cumu_compaction, &submitted_cumu_compactions](CloudTablet* t) { + return !!tablet_preparing_cumu_compaction.count(t->tablet_id()) || + !!submitted_cumu_compactions.count(t->tablet_id()) || + t->tablet_state() != TABLET_RUNNING; + }; + } + + // Even if need_pick_tablet is false, we still need to call find_best_tablet_to_compaction(), + // So that we can update the max_compaction_score metric. + do { + std::vector<CloudTabletSPtr> tablets; + auto st = tablet_mgr().get_topn_tablets_to_compact(n, compaction_type, filter_out, + &tablets, &max_compaction_score); + if (!st.ok()) { + LOG(WARNING) << "failed to get tablets to compact, err=" << st; + break; + } + if (!need_pick_tablet) break; Review Comment: warning: statement should be inside braces [readability-braces-around-statements] ```suggestion if (!need_pick_tablet) { break; } ``` ########## be/src/cloud/cloud_cumulative_compaction_policy.h: ########## @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stddef.h> +#include <stdint.h> + +#include <memory> +#include <string> +#include <vector> + +#include "common/config.h" +#include "cloud/cloud_tablet.h" +#include "olap/rowset/rowset.h" +#include "olap/rowset/rowset_meta.h" + +namespace doris { + +class Tablet; +struct Version; + +class CloudSizeBasedCumulativeCompactionPolicy { +public: + CloudSizeBasedCumulativeCompactionPolicy( + int64_t promotion_size = config::compaction_promotion_size_mbytes * 1024 * 1024, + double promotion_ratio = config::compaction_promotion_ratio, + int64_t promotion_min_size = config::compaction_promotion_min_size_mbytes * 1024 * 1024, + int64_t compaction_min_size = config::compaction_min_size_mbytes * 1024 * 1024); + + ~CloudSizeBasedCumulativeCompactionPolicy() {} + + int64_t new_cumulative_point(CloudTablet* tablet, const RowsetSharedPtr& output_rowset, + Version& last_delete_version, + int64_t last_cumulative_point); + + int pick_input_rowsets(CloudTablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets, + const int64_t max_compaction_score, const int64_t min_compaction_score, + std::vector<RowsetSharedPtr>* input_rowsets, + Version* last_delete_version, size_t* compaction_score, + bool allow_delete = false); + +private: + int64_t _level_size(const int64_t size); Review Comment: warning: parameter 'size' is const-qualified in the function declaration; const-qualification of parameters only has an effect in function definitions [readability-avoid-const-params-in-decls] ```suggestion int64_t _level_size(int64_t size); ``` ########## be/src/cloud/cloud_storage_engine.cpp: ########## @@ -190,4 +234,418 @@ } } +void CloudStorageEngine::get_cumu_compaction( + int64_t tablet_id, std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res) { + std::lock_guard lock(_compaction_mtx); + if (auto it = _submitted_cumu_compactions.find(tablet_id); + it != _submitted_cumu_compactions.end()) { + res = it->second; + } +} + +void CloudStorageEngine::_adjust_compaction_thread_num() { + int base_thread_num = get_base_thread_num(); + if (_base_compaction_thread_pool->max_threads() != base_thread_num) { + int old_max_threads = _base_compaction_thread_pool->max_threads(); + Status status = _base_compaction_thread_pool->set_max_threads(base_thread_num); + if (status.ok()) { + VLOG_NOTICE << "update base compaction thread pool max_threads from " << old_max_threads + << " to " << base_thread_num; + } + } + if (_base_compaction_thread_pool->min_threads() != base_thread_num) { + int old_min_threads = _base_compaction_thread_pool->min_threads(); + Status status = _base_compaction_thread_pool->set_min_threads(base_thread_num); + if (status.ok()) { + VLOG_NOTICE << "update base compaction thread pool min_threads from " << old_min_threads + << " to " << base_thread_num; + } + } + + int cumu_thread_num = get_cumu_thread_num(); + if (_cumu_compaction_thread_pool->max_threads() != cumu_thread_num) { + int old_max_threads = _cumu_compaction_thread_pool->max_threads(); + Status status = _cumu_compaction_thread_pool->set_max_threads(cumu_thread_num); + if (status.ok()) { + VLOG_NOTICE << "update cumu compaction thread pool max_threads from " << old_max_threads + << " to " << cumu_thread_num; + } + } + if (_cumu_compaction_thread_pool->min_threads() != cumu_thread_num) { + int old_min_threads = _cumu_compaction_thread_pool->min_threads(); + Status status = _cumu_compaction_thread_pool->set_min_threads(cumu_thread_num); + if (status.ok()) { + VLOG_NOTICE << "update cumu compaction thread pool min_threads from " << old_min_threads + << " to " << cumu_thread_num; + } + } +} + +void CloudStorageEngine::_compaction_tasks_producer_callback() { + LOG(INFO) << "try to start compaction producer process!"; + + int round = 0; + CompactionType compaction_type; + + // Used to record the time when the score metric was last updated. + // The update of the score metric is accompanied by the logic of selecting the tablet. + // If there is no slot available, the logic of selecting the tablet will be terminated, + // which causes the score metric update to be terminated. + // In order to avoid this situation, we need to update the score regularly. + int64_t last_cumulative_score_update_time = 0; + int64_t last_base_score_update_time = 0; + static const int64_t check_score_interval_ms = 5000; // 5 secs + + int64_t interval = config::generate_compaction_tasks_interval_ms; + do { + if (!config::disable_auto_compaction) { + _adjust_compaction_thread_num(); + + bool check_score = false; + int64_t cur_time = UnixMillis(); + if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) { + compaction_type = CompactionType::CUMULATIVE_COMPACTION; + round++; + if (cur_time - last_cumulative_score_update_time >= check_score_interval_ms) { + check_score = true; + last_cumulative_score_update_time = cur_time; + } + } else { + compaction_type = CompactionType::BASE_COMPACTION; + round = 0; + if (cur_time - last_base_score_update_time >= check_score_interval_ms) { + check_score = true; + last_base_score_update_time = cur_time; + } + } + std::unique_ptr<ThreadPool>& thread_pool = + (compaction_type == CompactionType::CUMULATIVE_COMPACTION) + ? _cumu_compaction_thread_pool + : _base_compaction_thread_pool; + VLOG_CRITICAL << "compaction thread pool. type: " + << (compaction_type == CompactionType::CUMULATIVE_COMPACTION ? "CUMU" + : "BASE") + << ", num_threads: " << thread_pool->num_threads() + << ", num_threads_pending_start: " + << thread_pool->num_threads_pending_start() + << ", num_active_threads: " << thread_pool->num_active_threads() + << ", max_threads: " << thread_pool->max_threads() + << ", min_threads: " << thread_pool->min_threads() + << ", num_total_queued_tasks: " << thread_pool->get_queue_size(); + std::vector<CloudTabletSPtr> tablets_compaction = + _generate_cloud_compaction_tasks(compaction_type, check_score); + + /// Regardless of whether the tablet is submitted for compaction or not, + /// we need to call 'reset_compaction' to clean up the base_compaction or cumulative_compaction objects + /// in the tablet, because these two objects store the tablet's own shared_ptr. + /// If it is not cleaned up, the reference count of the tablet will always be greater than 1, + /// thus cannot be collected by the garbage collector. (TabletManager::start_trash_sweep) + for (const auto& tablet : tablets_compaction) { + Status st = submit_compaction_task(tablet, compaction_type); + if (st.ok()) continue; + if ((!st.is<ErrorCode::BE_NO_SUITABLE_VERSION>() && + !st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) || + VLOG_DEBUG_IS_ON) { + LOG(WARNING) << "failed to submit compaction task for tablet: " + << tablet->tablet_id() << ", err: " << st; + } + } + interval = config::generate_compaction_tasks_interval_ms; + } else { + interval = config::check_auto_compaction_interval_seconds * 1000; + } + } while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval))); +} + +std::vector<CloudTabletSPtr> CloudStorageEngine::_generate_cloud_compaction_tasks( Review Comment: warning: function '_generate_cloud_compaction_tasks' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp std::vector<CloudTabletSPtr> CloudStorageEngine::_generate_cloud_compaction_tasks( ^ ``` <details> <summary>Additional context</summary> **be/src/cloud/cloud_storage_engine.cpp:359:** 85 lines including whitespace and comments (threshold 80) ```cpp std::vector<CloudTabletSPtr> CloudStorageEngine::_generate_cloud_compaction_tasks( ^ ``` </details> ########## be/src/olap/compaction.cpp: ########## @@ -844,4 +847,80 @@ void Compaction::_load_segment_to_cache() { } } +void CloudCompactionMixin::build_basic_info() { Review Comment: warning: method 'build_basic_info' can be made static [readability-convert-member-functions-to-static] be/src/olap/compaction.h:184: ```diff - void build_basic_info(); + static void build_basic_info(); ``` ########## be/src/olap/compaction.cpp: ########## @@ -844,4 +847,80 @@ } } +void CloudCompactionMixin::build_basic_info() { + _output_version = + Version(_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()); + + _newest_write_timestamp = _input_rowsets.back()->newest_write_timestamp(); + + std::vector<RowsetMetaSharedPtr> rowset_metas(_input_rowsets.size()); + std::transform(_input_rowsets.begin(), _input_rowsets.end(), rowset_metas.begin(), + [](const RowsetSharedPtr& rowset) { return rowset->rowset_meta(); }); + _cur_tablet_schema = _tablet->tablet_schema_with_merged_max_schema_version(rowset_metas); +} + +int64_t CloudCompactionMixin::get_compaction_permits() { Review Comment: warning: method 'get_compaction_permits' can be made static [readability-convert-member-functions-to-static] be/src/olap/compaction.h:188: ```diff - int64_t get_compaction_permits(); + static int64_t get_compaction_permits(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
