github-actions[bot] commented on code in PR #31215: URL: https://github.com/apache/doris/pull/31215#discussion_r1497198972
########## be/src/cloud/cloud_base_compaction.cpp: ########## @@ -0,0 +1,387 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_base_compaction.h" + +#include <boost/container_hash/hash.hpp> + +#include "cloud/cloud_meta_mgr.h" +#include "cloud/config.h" +#include "common/config.h" +#include "common/sync_point.h" +#include "gen_cpp/cloud.pb.h" +#include "olap/compaction.h" +#include "olap/task/engine_checksum_task.h" +#include "service/backend_options.h" +#include "util/thread.h" +#include "util/uuid_generator.h" + +namespace doris { +using namespace ErrorCode; + +bvar::Adder<uint64_t> base_output_size("base_compaction", "output_size"); + +CloudBaseCompaction::CloudBaseCompaction(CloudStorageEngine& engine, CloudTabletSPtr tablet) + : CloudCompactionMixin(engine, std::move(tablet), + "BaseCompaction:" + std::to_string(tablet->tablet_id())) { + auto uuid = UUIDGenerator::instance()->next_uuid(); + std::stringstream ss; + ss << uuid; + _uuid = ss.str(); +} + +CloudBaseCompaction::~CloudBaseCompaction() = default; + +Status CloudBaseCompaction::prepare_compact() { + if (_tablet->tablet_state() != TABLET_RUNNING) { + return Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id()); + } + + bool need_sync_tablet = true; + { + std::shared_lock rlock(_tablet->get_header_lock()); + // If number of rowsets is equal to approximate_num_rowsets, it is very likely that this tablet has been + // synchronized with meta-service. + if (_tablet->tablet_meta()->all_rs_metas().size() >= + cloud_tablet()->fetch_add_approximate_num_rowsets(0) && + cloud_tablet()->last_sync_time_s > 0) { + need_sync_tablet = false; + } + } + if (need_sync_tablet) { + RETURN_IF_ERROR(cloud_tablet()->sync_rowsets()); + } + + RETURN_IF_ERROR(pick_rowsets_to_compact()); + + // prepare compaction job + cloud::TabletJobInfoPB job; + auto idx = job.mutable_idx(); + idx->set_tablet_id(_tablet->tablet_id()); + idx->set_table_id(_tablet->table_id()); + idx->set_index_id(_tablet->index_id()); + idx->set_partition_id(_tablet->partition_id()); + auto compaction_job = job.add_compaction(); + compaction_job->set_id(_uuid); + compaction_job->set_initiator(BackendOptions::get_localhost() + ':' + + std::to_string(config::heartbeat_service_port)); + compaction_job->set_type(cloud::TabletCompactionJobPB::BASE); + compaction_job->set_base_compaction_cnt(_base_compaction_cnt); + compaction_job->set_cumulative_compaction_cnt(_cumulative_compaction_cnt); + using namespace std::chrono; + int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count(); + _expiration = now + config::compaction_timeout_seconds; + compaction_job->set_expiration(_expiration); + compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4); + cloud::StartTabletJobResponse resp; + //auto st = cloud::meta_mgr()->prepare_tablet_job(job, &resp); + auto st = _engine.meta_mgr().prepare_tablet_job(job, &resp); + if (!st.ok()) { + if (resp.status().code() == cloud::STALE_TABLET_CACHE) { + // set last_sync_time to 0 to force sync tablet next time + cloud_tablet()->last_sync_time_s = 0; + } else if (resp.status().code() == cloud::TABLET_NOT_FOUND) { + // tablet not found + cloud_tablet()->recycle_cached_data(); + } + return st; + } + + for (auto& rs : _input_rowsets) { + _input_row_num += rs->num_rows(); + _input_segments += rs->num_segments(); + _input_rowsets_size += rs->data_disk_size(); + } + LOG_INFO("start CloudBaseCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(), + _input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()) + .tag("job_id", _uuid) + .tag("input_rowsets", _input_rowsets.size()) + .tag("input_rows", _input_row_num) + .tag("input_segments", _input_segments) + .tag("input_data_size", _input_rowsets_size); + return st; +} + +void CloudBaseCompaction::_filter_input_rowset() { Review Comment: warning: method '_filter_input_rowset' can be made static [readability-convert-member-functions-to-static] be/src/cloud/cloud_base_compaction.h:46: ```diff - void _filter_input_rowset(); + static void _filter_input_rowset(); ``` ########## be/src/cloud/cloud_cumulative_compaction.cpp: ########## @@ -0,0 +1,477 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_cumulative_compaction.h" + +#include "cloud/cloud_meta_mgr.h" +#include "cloud/config.h" +#include "common/config.h" +#include "common/logging.h" +#include "common/status.h" +#include "common/sync_point.h" +#include "gen_cpp/cloud.pb.h" +#include "olap/compaction.h" +#include "olap/cumulative_compaction_policy.h" +#include "service/backend_options.h" +#include "util/trace.h" +#include "util/uuid_generator.h" + +namespace doris { +using namespace ErrorCode; + +bvar::Adder<uint64_t> cumu_output_size("cumu_compaction", "output_size"); + +CloudCumulativeCompaction::CloudCumulativeCompaction(CloudStorageEngine& engine, + CloudTabletSPtr tablet) + : CloudCompactionMixin(engine, std::move(tablet), + "BaseCompaction:" + std::to_string(tablet->tablet_id())) { + auto uuid = UUIDGenerator::instance()->next_uuid(); + std::stringstream ss; + ss << uuid; + _uuid = ss.str(); +} + +CloudCumulativeCompaction::~CloudCumulativeCompaction() = default; + +Status CloudCumulativeCompaction::prepare_compact() { Review Comment: warning: function 'prepare_compact' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp Status CloudCumulativeCompaction::prepare_compact() { ^ ``` <details> <summary>Additional context</summary> **be/src/cloud/cloud_cumulative_compaction.cpp:49:** 115 lines including whitespace and comments (threshold 80) ```cpp Status CloudCumulativeCompaction::prepare_compact() { ^ ``` </details> ########## be/src/cloud/cloud_cumulative_compaction.cpp: ########## @@ -0,0 +1,477 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_cumulative_compaction.h" + +#include "cloud/cloud_meta_mgr.h" +#include "cloud/config.h" +#include "common/config.h" +#include "common/logging.h" +#include "common/status.h" +#include "common/sync_point.h" +#include "gen_cpp/cloud.pb.h" +#include "olap/compaction.h" +#include "olap/cumulative_compaction_policy.h" +#include "service/backend_options.h" +#include "util/trace.h" +#include "util/uuid_generator.h" + +namespace doris { +using namespace ErrorCode; + +bvar::Adder<uint64_t> cumu_output_size("cumu_compaction", "output_size"); + +CloudCumulativeCompaction::CloudCumulativeCompaction(CloudStorageEngine& engine, + CloudTabletSPtr tablet) + : CloudCompactionMixin(engine, std::move(tablet), + "BaseCompaction:" + std::to_string(tablet->tablet_id())) { + auto uuid = UUIDGenerator::instance()->next_uuid(); + std::stringstream ss; + ss << uuid; + _uuid = ss.str(); +} + +CloudCumulativeCompaction::~CloudCumulativeCompaction() = default; + +Status CloudCumulativeCompaction::prepare_compact() { + if (_tablet->tablet_state() != TABLET_RUNNING) { + return Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id()); + } + + std::vector<std::shared_ptr<CloudCumulativeCompaction>> cumu_compactions; + _engine.get_cumu_compaction(_tablet->tablet_id(), cumu_compactions); + if (!cumu_compactions.empty()) { + for (auto& cumu : cumu_compactions) { + _max_conflict_version = + std::max(_max_conflict_version, cumu->_input_rowsets.back()->end_version()); + } + } + + int tried = 0; +PREPARE_TRY_AGAIN: + + bool need_sync_tablet = true; + { + std::shared_lock rlock(_tablet->get_header_lock()); + // If number of rowsets is equal to approximate_num_rowsets, it is very likely that this tablet has been + // synchronized with meta-service. + if (_tablet->tablet_meta()->all_rs_metas().size() >= + cloud_tablet()->fetch_add_approximate_num_rowsets(0) && + cloud_tablet()->last_sync_time_s > 0) { + need_sync_tablet = false; + } + } + if (need_sync_tablet) { + RETURN_IF_ERROR(cloud_tablet()->sync_rowsets()); + } + + // pick rowsets to compact + auto st = pick_rowsets_to_compact(); + if (!st.ok()) { + if (tried == 0 && _last_delete_version.first != -1) { + // we meet a delete version, should increase the cumulative point to let base compaction handle the delete version. + // plus 1 to skip the delete version. + // NOTICE: after that, the cumulative point may be larger than max version of this tablet, but it doesn't matter. + update_cumulative_point(); + } + return st; + } + + // prepare compaction job + cloud::TabletJobInfoPB job; + auto idx = job.mutable_idx(); + idx->set_tablet_id(_tablet->tablet_id()); + idx->set_table_id(_tablet->table_id()); + idx->set_index_id(_tablet->index_id()); + idx->set_partition_id(_tablet->partition_id()); + auto compaction_job = job.add_compaction(); + compaction_job->set_id(_uuid); + compaction_job->set_initiator(BackendOptions::get_localhost() + ':' + + std::to_string(config::heartbeat_service_port)); + compaction_job->set_type(cloud::TabletCompactionJobPB::CUMULATIVE); + compaction_job->set_base_compaction_cnt(_base_compaction_cnt); + compaction_job->set_cumulative_compaction_cnt(_cumulative_compaction_cnt); + using namespace std::chrono; + int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count(); + _expiration = now + config::compaction_timeout_seconds; + compaction_job->set_expiration(_expiration); + compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4); + if (config::enable_parallel_cumu_compaction) { + // Set input version range to let meta-service judge version range conflict + compaction_job->add_input_versions(_input_rowsets.front()->start_version()); + compaction_job->add_input_versions(_input_rowsets.back()->end_version()); + } + cloud::StartTabletJobResponse resp; + st = _engine.meta_mgr().prepare_tablet_job(job, &resp); + if (!st.ok()) { + if (resp.status().code() == cloud::STALE_TABLET_CACHE) { + // set last_sync_time to 0 to force sync tablet next time + cloud_tablet()->last_sync_time_s = 0; + } else if (resp.status().code() == cloud::TABLET_NOT_FOUND) { + // tablet not found + cloud_tablet()->recycle_cached_data(); + } else if (resp.status().code() == cloud::JOB_TABLET_BUSY) { + if (config::enable_parallel_cumu_compaction && resp.version_in_compaction_size() > 0 && + ++tried <= 2) { + _max_conflict_version = *std::max_element(resp.version_in_compaction().begin(), + resp.version_in_compaction().end()); + LOG_INFO("retry pick input rowsets") + .tag("job_id", _uuid) + .tag("max_conflict_version", _max_conflict_version) + .tag("tried", tried) + .tag("msg", resp.status().msg()); + goto PREPARE_TRY_AGAIN; + } else { + LOG_WARNING("failed to prepare cumu compaction") + .tag("job_id", _uuid) + .tag("msg", resp.status().msg()); + return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("no suitable versions"); + } + } + return st; + } + + for (auto& rs : _input_rowsets) { + _input_row_num += rs->num_rows(); + _input_segments += rs->num_segments(); + _input_rowsets_size += rs->data_disk_size(); + } + LOG_INFO("start CloudCumulativeCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(), + _input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()) + .tag("job_id", _uuid) + .tag("input_rowsets", _input_rowsets.size()) + .tag("input_rows", _input_row_num) + .tag("input_segments", _input_segments) + .tag("input_data_size", _input_rowsets_size) + .tag("tablet_max_version", cloud_tablet()->max_version_unlocked()) + .tag("cumulative_point", cloud_tablet()->cumulative_layer_point()) + .tag("num_rowsets", cloud_tablet()->fetch_add_approximate_num_rowsets(0)) + .tag("cumu_num_rowsets", cloud_tablet()->fetch_add_approximate_cumu_num_rowsets(0)); + return st; +} + +Status CloudCumulativeCompaction::execute_compact() { + TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudCumulativeCompaction::execute_compact_impl", + Status::OK(), this); + using namespace std::chrono; + auto start = steady_clock::now(); + RETURN_IF_ERROR(CloudCompactionMixin::execute_compact()); + LOG_INFO("finish CloudCumulativeCompaction, tablet_id={}, cost={}ms", _tablet->tablet_id(), + duration_cast<milliseconds>(steady_clock::now() - start).count()) + .tag("job_id", _uuid) + .tag("input_rowsets", _input_rowsets.size()) + .tag("input_rows", _input_row_num) + .tag("input_segments", _input_segments) + .tag("input_data_size", _input_rowsets_size) + .tag("output_rows", _output_rowset->num_rows()) + .tag("output_segments", _output_rowset->num_segments()) + .tag("output_data_size", _output_rowset->data_disk_size()) + .tag("tablet_max_version", _tablet->max_version_unlocked()) + .tag("cumulative_point", cloud_tablet()->cumulative_layer_point()) + .tag("num_rowsets", cloud_tablet()->fetch_add_approximate_num_rowsets(0)) + .tag("cumu_num_rowsets", cloud_tablet()->fetch_add_approximate_cumu_num_rowsets(0)); + + _state = CompactionState::SUCCESS; + + DorisMetrics::instance()->cumulative_compaction_deltas_total->increment(_input_rowsets.size()); + DorisMetrics::instance()->cumulative_compaction_bytes_total->increment(_input_rowsets_size); + cumu_output_size << _output_rowset->data_disk_size(); + + return Status::OK(); +} + +Status CloudCumulativeCompaction::modify_rowsets() { Review Comment: warning: function 'modify_rowsets' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp Status CloudCumulativeCompaction::modify_rowsets() { ^ ``` <details> <summary>Additional context</summary> **be/src/cloud/cloud_cumulative_compaction.cpp:196:** 102 lines including whitespace and comments (threshold 80) ```cpp Status CloudCumulativeCompaction::modify_rowsets() { ^ ``` </details> ########## be/src/cloud/cloud_base_compaction.cpp: ########## @@ -0,0 +1,387 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_base_compaction.h" + +#include <boost/container_hash/hash.hpp> + +#include "cloud/cloud_meta_mgr.h" +#include "cloud/config.h" +#include "common/config.h" +#include "common/sync_point.h" +#include "gen_cpp/cloud.pb.h" +#include "olap/compaction.h" +#include "olap/task/engine_checksum_task.h" +#include "service/backend_options.h" +#include "util/thread.h" +#include "util/uuid_generator.h" + +namespace doris { +using namespace ErrorCode; + +bvar::Adder<uint64_t> base_output_size("base_compaction", "output_size"); + +CloudBaseCompaction::CloudBaseCompaction(CloudStorageEngine& engine, CloudTabletSPtr tablet) + : CloudCompactionMixin(engine, std::move(tablet), + "BaseCompaction:" + std::to_string(tablet->tablet_id())) { + auto uuid = UUIDGenerator::instance()->next_uuid(); + std::stringstream ss; + ss << uuid; + _uuid = ss.str(); +} + +CloudBaseCompaction::~CloudBaseCompaction() = default; + +Status CloudBaseCompaction::prepare_compact() { + if (_tablet->tablet_state() != TABLET_RUNNING) { + return Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id()); + } + + bool need_sync_tablet = true; + { + std::shared_lock rlock(_tablet->get_header_lock()); + // If number of rowsets is equal to approximate_num_rowsets, it is very likely that this tablet has been + // synchronized with meta-service. + if (_tablet->tablet_meta()->all_rs_metas().size() >= + cloud_tablet()->fetch_add_approximate_num_rowsets(0) && + cloud_tablet()->last_sync_time_s > 0) { + need_sync_tablet = false; + } + } + if (need_sync_tablet) { + RETURN_IF_ERROR(cloud_tablet()->sync_rowsets()); + } + + RETURN_IF_ERROR(pick_rowsets_to_compact()); + + // prepare compaction job + cloud::TabletJobInfoPB job; + auto idx = job.mutable_idx(); + idx->set_tablet_id(_tablet->tablet_id()); + idx->set_table_id(_tablet->table_id()); + idx->set_index_id(_tablet->index_id()); + idx->set_partition_id(_tablet->partition_id()); + auto compaction_job = job.add_compaction(); + compaction_job->set_id(_uuid); + compaction_job->set_initiator(BackendOptions::get_localhost() + ':' + + std::to_string(config::heartbeat_service_port)); + compaction_job->set_type(cloud::TabletCompactionJobPB::BASE); + compaction_job->set_base_compaction_cnt(_base_compaction_cnt); + compaction_job->set_cumulative_compaction_cnt(_cumulative_compaction_cnt); + using namespace std::chrono; + int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count(); + _expiration = now + config::compaction_timeout_seconds; + compaction_job->set_expiration(_expiration); + compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4); + cloud::StartTabletJobResponse resp; + //auto st = cloud::meta_mgr()->prepare_tablet_job(job, &resp); + auto st = _engine.meta_mgr().prepare_tablet_job(job, &resp); + if (!st.ok()) { + if (resp.status().code() == cloud::STALE_TABLET_CACHE) { + // set last_sync_time to 0 to force sync tablet next time + cloud_tablet()->last_sync_time_s = 0; + } else if (resp.status().code() == cloud::TABLET_NOT_FOUND) { + // tablet not found + cloud_tablet()->recycle_cached_data(); + } + return st; + } + + for (auto& rs : _input_rowsets) { + _input_row_num += rs->num_rows(); + _input_segments += rs->num_segments(); + _input_rowsets_size += rs->data_disk_size(); + } + LOG_INFO("start CloudBaseCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(), + _input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()) + .tag("job_id", _uuid) + .tag("input_rowsets", _input_rowsets.size()) + .tag("input_rows", _input_row_num) + .tag("input_segments", _input_segments) + .tag("input_data_size", _input_rowsets_size); + return st; +} + +void CloudBaseCompaction::_filter_input_rowset() { + // if dup_key and no delete predicate + // we skip big files to save resources + if (_tablet->keys_type() != KeysType::DUP_KEYS) { + return; + } + for (auto& rs : _input_rowsets) { + if (rs->rowset_meta()->has_delete_predicate()) { + return; + } + } + int64_t max_size = config::base_compaction_dup_key_max_file_size_mbytes * 1024 * 1024; + // first find a proper rowset for start + auto rs_iter = _input_rowsets.begin(); + while (rs_iter != _input_rowsets.end()) { + if ((*rs_iter)->rowset_meta()->total_disk_size() >= max_size) { + rs_iter = _input_rowsets.erase(rs_iter); + } else { + break; + } + } +} + +Status CloudBaseCompaction::pick_rowsets_to_compact() { + _input_rowsets.clear(); + { + std::shared_lock rlock(_tablet->get_header_lock()); + _base_compaction_cnt = cloud_tablet()->base_compaction_cnt(); + _cumulative_compaction_cnt = cloud_tablet()->cumulative_compaction_cnt(); + _input_rowsets = cloud_tablet()->pick_candidate_rowsets_to_base_compaction(); + } + if (auto st = check_version_continuity(_input_rowsets); !st.ok()) { + DCHECK(false) << st; + return st; + } + _filter_input_rowset(); + if (_input_rowsets.size() <= 1) { + return Status::Error<BE_NO_SUITABLE_VERSION>( + "insuffient compation input rowset, #rowsets={}", _input_rowsets.size()); + } + + if (_input_rowsets.size() == 2 && _input_rowsets[0]->end_version() == 1) { + // the tablet is with rowset: [0-1], [2-y] + // and [0-1] has no data. in this situation, no need to do base compaction. + return Status::Error<BE_NO_SUITABLE_VERSION>("no suitable versions for compaction"); + } + + // 1. cumulative rowset must reach base_compaction_min_rowset_num threshold + if (_input_rowsets.size() > config::base_compaction_min_rowset_num) { + VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << _tablet->tablet_id() + << ", num_cumulative_rowsets=" << _input_rowsets.size() - 1 + << ", base_compaction_num_cumulative_rowsets=" + << config::base_compaction_min_rowset_num; + return Status::OK(); + } + + // 2. the ratio between base rowset and all input cumulative rowsets reaches the threshold + // `_input_rowsets` has been sorted by end version, so we consider `_input_rowsets[0]` is the base rowset. + int64_t base_size = _input_rowsets.front()->data_disk_size(); + int64_t cumulative_total_size = 0; + for (auto it = _input_rowsets.begin() + 1; it != _input_rowsets.end(); ++it) { + cumulative_total_size += (*it)->data_disk_size(); + } + + double base_cumulative_delta_ratio = config::base_compaction_min_data_ratio; + if (base_size == 0) { + // base_size == 0 means this may be a base version [0-1], which has no data. + // set to 1 to void divide by zero + base_size = 1; + } + double cumulative_base_ratio = static_cast<double>(cumulative_total_size) / base_size; + + if (cumulative_base_ratio > base_cumulative_delta_ratio) { + VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << _tablet->tablet_id() + << ", cumulative_total_size=" << cumulative_total_size + << ", base_size=" << base_size + << ", cumulative_base_ratio=" << cumulative_base_ratio + << ", policy_ratio=" << base_cumulative_delta_ratio; + return Status::OK(); + } + + // 3. the interval since last base compaction reaches the threshold + int64_t base_creation_time = _input_rowsets[0]->creation_time(); + int64_t interval_threshold = config::base_compaction_interval_seconds_since_last_operation; + int64_t interval_since_last_base_compaction = time(nullptr) - base_creation_time; + if (interval_since_last_base_compaction > interval_threshold) { + VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << _tablet->tablet_id() + << ", interval_since_last_base_compaction=" + << interval_since_last_base_compaction + << ", interval_threshold=" << interval_threshold; + return Status::OK(); + } + + VLOG_NOTICE << "don't satisfy the base compaction policy. tablet=" << _tablet->tablet_id() + << ", num_cumulative_rowsets=" << _input_rowsets.size() - 1 + << ", cumulative_base_ratio=" << cumulative_base_ratio + << ", interval_since_last_base_compaction=" << interval_since_last_base_compaction; + return Status::Error<BE_NO_SUITABLE_VERSION>("no suitable versions for compaction"); +} + +Status CloudBaseCompaction::execute_compact() { + if (config::enable_base_compaction_idle_sched) { + Thread::set_idle_sched(); + } + + SCOPED_ATTACH_TASK(_mem_tracker); + + using namespace std::chrono; + auto start = steady_clock::now(); + RETURN_IF_ERROR(CloudCompactionMixin::execute_compact()); + LOG_INFO("finish CloudBaseCompaction, tablet_id={}, cost={}ms", _tablet->tablet_id(), + duration_cast<milliseconds>(steady_clock::now() - start).count()) + .tag("job_id", _uuid) + .tag("input_rowsets", _input_rowsets.size()) + .tag("input_rows", _input_row_num) + .tag("input_segments", _input_segments) + .tag("input_data_size", _input_rowsets_size) + .tag("output_rows", _output_rowset->num_rows()) + .tag("output_segments", _output_rowset->num_segments()) + .tag("output_data_size", _output_rowset->data_disk_size()); + + //_compaction_succeed = true; + _state = CompactionState::SUCCESS; + + DorisMetrics::instance()->base_compaction_deltas_total->increment(_input_rowsets.size()); + DorisMetrics::instance()->base_compaction_bytes_total->increment(_input_rowsets_size); + base_output_size << _output_rowset->data_disk_size(); + + return Status::OK(); +} + +Status CloudBaseCompaction::modify_rowsets() { Review Comment: warning: function 'modify_rowsets' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp Status CloudBaseCompaction::modify_rowsets() { ^ ``` <details> <summary>Additional context</summary> **be/src/cloud/cloud_base_compaction.cpp:249:** 82 lines including whitespace and comments (threshold 80) ```cpp Status CloudBaseCompaction::modify_rowsets() { ^ ``` </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
