This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-4.0-preview in repository https://gitbox.apache.org/repos/asf/doris.git
commit 241981ec31dc72520aff0184c014aa5d38e6e77f Author: zzzxl <[email protected]> AuthorDate: Wed Apr 10 17:56:13 2024 +0800 [fix](inverted index) cloud mod supports time series (#33414) --- be/src/cloud/cloud_cumulative_compaction.cpp | 17 ++- .../cloud/cloud_cumulative_compaction_policy.cpp | 150 ++++++++++++++++++- be/src/cloud/cloud_cumulative_compaction_policy.h | 63 ++++++-- be/src/cloud/cloud_rowset_builder.cpp | 4 + be/src/cloud/cloud_rowset_writer.cpp | 1 + be/src/cloud/cloud_storage_engine.cpp | 18 ++- be/src/cloud/cloud_storage_engine.h | 11 +- be/src/cloud/cloud_tablet.cpp | 18 +++ be/src/cloud/cloud_tablet_mgr.cpp | 1 + be/src/olap/compaction.cpp | 5 + be/src/olap/rowset/rowset_writer_context.h | 2 + cloud/src/meta-service/meta_service.cpp | 17 +++ .../org/apache/doris/alter/CloudRollupJobV2.java | 7 +- .../apache/doris/alter/CloudSchemaChangeJobV2.java | 7 +- .../java/org/apache/doris/catalog/OlapTable.java | 12 +- .../cloud/alter/CloudSchemaChangeHandler.java | 160 ++++++++++++++++++++- .../cloud/datasource/CloudInternalCatalog.java | 19 ++- gensrc/proto/cloud.proto | 6 + 18 files changed, 483 insertions(+), 35 deletions(-) diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index 4e43246eebd..39ce711c9db 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -200,8 +200,11 @@ Status CloudCumulativeCompaction::execute_compact() { Status CloudCumulativeCompaction::modify_rowsets() { // calculate new cumulative point int64_t input_cumulative_point = cloud_tablet()->cumulative_layer_point(); - int64_t new_cumulative_point = _engine.cumu_compaction_policy()->new_cumulative_point( - cloud_tablet(), _output_rowset, _last_delete_version, input_cumulative_point); + auto compaction_policy = cloud_tablet()->tablet_meta()->compaction_policy(); + int64_t new_cumulative_point = + _engine.cumu_compaction_policy(compaction_policy) + ->new_cumulative_point(cloud_tablet(), _output_rowset, _last_delete_version, + input_cumulative_point); // commit compaction job cloud::TabletJobInfoPB job; auto idx = job.mutable_idx(); @@ -352,10 +355,12 @@ Status CloudCumulativeCompaction::pick_rowsets_to_compact() { } size_t compaction_score = 0; - _engine.cumu_compaction_policy()->pick_input_rowsets( - cloud_tablet(), candidate_rowsets, config::cumulative_compaction_max_deltas, - config::cumulative_compaction_min_deltas, &_input_rowsets, &_last_delete_version, - &compaction_score); + auto compaction_policy = cloud_tablet()->tablet_meta()->compaction_policy(); + _engine.cumu_compaction_policy(compaction_policy) + ->pick_input_rowsets(cloud_tablet(), candidate_rowsets, + config::cumulative_compaction_max_deltas, + config::cumulative_compaction_min_deltas, &_input_rowsets, + &_last_delete_version, &compaction_score); if (_input_rowsets.empty()) { return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("no suitable versions"); diff --git a/be/src/cloud/cloud_cumulative_compaction_policy.cpp b/be/src/cloud/cloud_cumulative_compaction_policy.cpp index 5875340ec7b..ecf53bd2c68 100644 --- a/be/src/cloud/cloud_cumulative_compaction_policy.cpp +++ b/be/src/cloud/cloud_cumulative_compaction_policy.cpp @@ -49,7 +49,7 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size return (int64_t)1 << (sizeof(size) * 8 - 1 - __builtin_clzl(size)); } -int CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets( +int32_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets( CloudTablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets, const int64_t max_compaction_score, const int64_t min_compaction_score, std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version, @@ -213,4 +213,152 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point( : last_cumulative_point; } +int32_t CloudTimeSeriesCumulativeCompactionPolicy::pick_input_rowsets( + CloudTablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets, + const int64_t max_compaction_score, const int64_t min_compaction_score, + std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version, + size_t* compaction_score, bool allow_delete) { + if (tablet->tablet_state() == TABLET_NOTREADY) { + return 0; + } + + int64_t compaction_goal_size_mbytes = + tablet->tablet_meta()->time_series_compaction_goal_size_mbytes(); + + int transient_size = 0; + *compaction_score = 0; + input_rowsets->clear(); + int64_t total_size = 0; + + for (const auto& rowset : candidate_rowsets) { + // check whether this rowset is delete version + if (!allow_delete && rowset->rowset_meta()->has_delete_predicate()) { + *last_delete_version = rowset->version(); + if (!input_rowsets->empty()) { + // we meet a delete version, and there were other versions before. + // we should compact those version before handling them over to base compaction + break; + } else { + // we meet a delete version, and no other versions before, skip it and continue + input_rowsets->clear(); + *compaction_score = 0; + transient_size = 0; + total_size = 0; + continue; + } + } + + *compaction_score += rowset->rowset_meta()->get_compaction_score(); + total_size += rowset->rowset_meta()->total_disk_size(); + + transient_size += 1; + input_rowsets->push_back(rowset); + + // Condition 1: the size of input files for compaction meets the requirement of parameter compaction_goal_size + if (total_size >= (compaction_goal_size_mbytes * 1024 * 1024)) { + if (input_rowsets->size() == 1 && + !input_rowsets->front()->rowset_meta()->is_segments_overlapping()) { + // Only 1 non-overlapping rowset, skip it + input_rowsets->clear(); + *compaction_score = 0; + total_size = 0; + continue; + } + return transient_size; + } + } + + // if there is delete version, do compaction directly + if (last_delete_version->first != -1) { + // if there is only one rowset and not overlapping, + // we do not need to do cumulative compaction + if (input_rowsets->size() == 1 && + !input_rowsets->front()->rowset_meta()->is_segments_overlapping()) { + input_rowsets->clear(); + *compaction_score = 0; + } + return transient_size; + } + + // Condition 2: the number of input files reaches the threshold specified by parameter compaction_file_count_threshold + if (*compaction_score >= tablet->tablet_meta()->time_series_compaction_file_count_threshold()) { + return transient_size; + } + + // Condition 3: level1 achieve compaction_goal_size + std::vector<RowsetSharedPtr> level1_rowsets; + if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) { + int64_t continuous_size = 0; + for (const auto& rowset : candidate_rowsets) { + const auto& rs_meta = rowset->rowset_meta(); + if (rs_meta->compaction_level() == 0) { + break; + } + level1_rowsets.push_back(rowset); + continuous_size += rs_meta->total_disk_size(); + if (level1_rowsets.size() >= 2) { + if (continuous_size >= compaction_goal_size_mbytes * 1024 * 1024) { + input_rowsets->swap(level1_rowsets); + return input_rowsets->size(); + } + } + } + } + + int64_t now = UnixMillis(); + int64_t last_cumu = tablet->last_cumu_compaction_success_time(); + if (last_cumu != 0) { + int64_t cumu_interval = now - last_cumu; + + // Condition 4: the time interval between compactions exceeds the value specified by parameter compaction_time_threshold_second + if (cumu_interval > + (tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() * 1000)) { + if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) { + if (input_rowsets->empty() && level1_rowsets.size() >= 2) { + input_rowsets->swap(level1_rowsets); + return input_rowsets->size(); + } + } + return transient_size; + } + } + + input_rowsets->clear(); + *compaction_score = 0; + + return 0; +} + +int64_t CloudTimeSeriesCumulativeCompactionPolicy::new_compaction_level( + const std::vector<RowsetSharedPtr>& input_rowsets) { + int64_t first_level = 0; + for (size_t i = 0; i < input_rowsets.size(); i++) { + int64_t cur_level = input_rowsets[i]->rowset_meta()->compaction_level(); + if (i == 0) { + first_level = cur_level; + } else { + if (first_level != cur_level) { + LOG(ERROR) << "Failed to check compaction level, first_level: " << first_level + << ", cur_level: " << cur_level; + } + } + } + return first_level + 1; +} + +int64_t CloudTimeSeriesCumulativeCompactionPolicy::new_cumulative_point( + CloudTablet* tablet, const RowsetSharedPtr& output_rowset, Version& last_delete_version, + int64_t last_cumulative_point) { + if (tablet->tablet_state() != TABLET_RUNNING || output_rowset->num_segments() == 0) { + return last_cumulative_point; + } + + if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2 && + output_rowset->rowset_meta()->compaction_level() < 2) { + return last_cumulative_point; + } + + return output_rowset->end_version() + 1; +} + } // namespace doris diff --git a/be/src/cloud/cloud_cumulative_compaction_policy.h b/be/src/cloud/cloud_cumulative_compaction_policy.h index dffe6e0cd0f..17edc41859e 100644 --- a/be/src/cloud/cloud_cumulative_compaction_policy.h +++ b/be/src/cloud/cloud_cumulative_compaction_policy.h @@ -34,7 +34,26 @@ namespace doris { class Tablet; struct Version; -class CloudSizeBasedCumulativeCompactionPolicy { +class CloudCumulativeCompactionPolicy { +public: + virtual ~CloudCumulativeCompactionPolicy() = default; + + virtual int64_t new_cumulative_point(CloudTablet* tablet, const RowsetSharedPtr& output_rowset, + Version& last_delete_version, + int64_t last_cumulative_point) = 0; + + virtual int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& input_rowsets) = 0; + + virtual int32_t pick_input_rowsets(CloudTablet* tablet, + const std::vector<RowsetSharedPtr>& candidate_rowsets, + const int64_t max_compaction_score, + const int64_t min_compaction_score, + std::vector<RowsetSharedPtr>* input_rowsets, + Version* last_delete_version, size_t* compaction_score, + bool allow_delete = false) = 0; +}; + +class CloudSizeBasedCumulativeCompactionPolicy : public CloudCumulativeCompactionPolicy { public: CloudSizeBasedCumulativeCompactionPolicy( int64_t promotion_size = config::compaction_promotion_size_mbytes * 1024 * 1024, @@ -43,17 +62,23 @@ public: int64_t compaction_min_size = config::compaction_min_size_mbytes * 1024 * 1024, int64_t promotion_version_count = config::compaction_promotion_version_count); - ~CloudSizeBasedCumulativeCompactionPolicy() {} + ~CloudSizeBasedCumulativeCompactionPolicy() override = default; int64_t new_cumulative_point(CloudTablet* tablet, const RowsetSharedPtr& output_rowset, - Version& last_delete_version, int64_t last_cumulative_point); + Version& last_delete_version, + int64_t last_cumulative_point) override; + + int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& input_rowsets) override { + return 0; + } - int pick_input_rowsets(CloudTablet* tablet, - const std::vector<RowsetSharedPtr>& candidate_rowsets, - const int64_t max_compaction_score, const int64_t min_compaction_score, - std::vector<RowsetSharedPtr>* input_rowsets, - Version* last_delete_version, size_t* compaction_score, - bool allow_delete = false); + int32_t pick_input_rowsets(CloudTablet* tablet, + const std::vector<RowsetSharedPtr>& candidate_rowsets, + const int64_t max_compaction_score, + const int64_t min_compaction_score, + std::vector<RowsetSharedPtr>* input_rowsets, + Version* last_delete_version, size_t* compaction_score, + bool allow_delete = false) override; private: int64_t _level_size(const int64_t size); @@ -73,4 +98,24 @@ private: int64_t _promotion_version_count; }; +class CloudTimeSeriesCumulativeCompactionPolicy : public CloudCumulativeCompactionPolicy { +public: + CloudTimeSeriesCumulativeCompactionPolicy() = default; + ~CloudTimeSeriesCumulativeCompactionPolicy() override = default; + + int64_t new_cumulative_point(CloudTablet* tablet, const RowsetSharedPtr& output_rowset, + Version& last_delete_version, + int64_t last_cumulative_point) override; + + int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& input_rowsets) override; + + int32_t pick_input_rowsets(CloudTablet* tablet, + const std::vector<RowsetSharedPtr>& candidate_rowsets, + const int64_t max_compaction_score, + const int64_t min_compaction_score, + std::vector<RowsetSharedPtr>* input_rowsets, + Version* last_delete_version, size_t* compaction_score, + bool allow_delete = false) override; +}; + } // namespace doris diff --git a/be/src/cloud/cloud_rowset_builder.cpp b/be/src/cloud/cloud_rowset_builder.cpp index b1cbefba544..3585878cd72 100644 --- a/be/src/cloud/cloud_rowset_builder.cpp +++ b/be/src/cloud/cloud_rowset_builder.cpp @@ -46,6 +46,10 @@ Status CloudRowsetBuilder::init() { } RETURN_IF_ERROR(check_tablet_version_count()); + using namespace std::chrono; + std::static_pointer_cast<CloudTablet>(_tablet)->last_load_time_ms = + duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); + // build tablet schema in request level _build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), *_tablet->tablet_schema()); diff --git a/be/src/cloud/cloud_rowset_writer.cpp b/be/src/cloud/cloud_rowset_writer.cpp index 0111b511750..26c3fe714d3 100644 --- a/be/src/cloud/cloud_rowset_writer.cpp +++ b/be/src/cloud/cloud_rowset_writer.cpp @@ -46,6 +46,7 @@ Status CloudRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) _rowset_meta->set_segments_overlap(_context.segments_overlap); _rowset_meta->set_txn_id(_context.txn_id); _rowset_meta->set_txn_expiration(_context.txn_expiration); + _rowset_meta->set_compaction_level(_context.compaction_level); if (_context.rowset_state == PREPARED || _context.rowset_state == COMMITTED) { _is_pending = true; _rowset_meta->set_load_id(_context.load_id); diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index 24ceeef4277..96e336b3ef3 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -39,6 +39,7 @@ #include "io/fs/s3_file_system.h" #include "io/hdfs_util.h" #include "olap/cumulative_compaction_policy.h" +#include "olap/cumulative_compaction_time_series_policy.h" #include "olap/memtable_flush_executor.h" #include "olap/storage_policy.h" #include "runtime/memory/cache_manager.h" @@ -68,9 +69,12 @@ int get_base_thread_num() { CloudStorageEngine::CloudStorageEngine(const UniqueId& backend_uid) : BaseStorageEngine(Type::CLOUD, backend_uid), _meta_mgr(std::make_unique<cloud::CloudMetaMgr>()), - _tablet_mgr(std::make_unique<CloudTabletMgr>(*this)), - _cumulative_compaction_policy( - std::make_shared<CloudSizeBasedCumulativeCompactionPolicy>()) {} + _tablet_mgr(std::make_unique<CloudTabletMgr>(*this)) { + _cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY] = + std::make_shared<CloudSizeBasedCumulativeCompactionPolicy>(); + _cumulative_compaction_policies[CUMULATIVE_TIME_SERIES_POLICY] = + std::make_shared<CloudTimeSeriesCumulativeCompactionPolicy>(); +} CloudStorageEngine::~CloudStorageEngine() { stop(); @@ -766,4 +770,12 @@ Status CloudStorageEngine::get_compaction_status_json(std::string* result) { return Status::OK(); } +std::shared_ptr<CloudCumulativeCompactionPolicy> CloudStorageEngine::cumu_compaction_policy( + std::string_view compaction_policy) { + if (!_cumulative_compaction_policies.contains(compaction_policy)) { + return _cumulative_compaction_policies.at(CUMULATIVE_SIZE_BASED_POLICY); + } + return _cumulative_compaction_policies.at(compaction_policy); +} + } // namespace doris diff --git a/be/src/cloud/cloud_storage_engine.h b/be/src/cloud/cloud_storage_engine.h index b8c4da9cf64..297050360df 100644 --- a/be/src/cloud/cloud_storage_engine.h +++ b/be/src/cloud/cloud_storage_engine.h @@ -87,10 +87,6 @@ public: void get_cumu_compaction(int64_t tablet_id, std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res); - CloudSizeBasedCumulativeCompactionPolicy* cumu_compaction_policy() const { - return _cumulative_compaction_policy.get(); - } - Status submit_compaction_task(const CloudTabletSPtr& tablet, CompactionType compaction_type); Status get_compaction_status_json(std::string* result); @@ -110,6 +106,9 @@ public: return _submitted_full_compactions.count(tablet_id); } + std::shared_ptr<CloudCumulativeCompactionPolicy> cumu_compaction_policy( + std::string_view compaction_policy); + private: void _refresh_storage_vault_info_thread_callback(); void _vacuum_stale_rowsets_thread_callback(); @@ -151,7 +150,9 @@ private: std::unique_ptr<ThreadPool> _base_compaction_thread_pool; std::unique_ptr<ThreadPool> _cumu_compaction_thread_pool; - std::shared_ptr<CloudSizeBasedCumulativeCompactionPolicy> _cumulative_compaction_policy; + using CumuPolices = + std::unordered_map<std::string_view, std::shared_ptr<CloudCumulativeCompactionPolicy>>; + CumuPolices _cumulative_compaction_policies; }; } // namespace doris diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index f4941deff0e..817df0e0e17 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -32,6 +32,7 @@ #include "cloud/cloud_storage_engine.h" #include "cloud/cloud_tablet_mgr.h" #include "io/cache/block_file_cache_factory.h" +#include "olap/cumulative_compaction_time_series_policy.h" #include "olap/olap_define.h" #include "olap/rowset/beta_rowset.h" #include "olap/rowset/rowset.h" @@ -409,6 +410,23 @@ Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_transient_rowset_write } int64_t CloudTablet::get_cloud_base_compaction_score() const { + if (_tablet_meta->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY) { + bool has_delete = false; + int64_t point = cumulative_layer_point(); + for (const auto& rs_meta : _tablet_meta->all_rs_metas()) { + if (rs_meta->start_version() >= point) { + continue; + } + if (rs_meta->has_delete_predicate()) { + has_delete = true; + break; + } + } + if (!has_delete) { + return 0; + } + } + return _approximate_num_rowsets.load(std::memory_order_relaxed) - _approximate_cumu_num_rowsets.load(std::memory_order_relaxed); } diff --git a/be/src/cloud/cloud_tablet_mgr.cpp b/be/src/cloud/cloud_tablet_mgr.cpp index 5c4c9af2e7b..06bea6db126 100644 --- a/be/src/cloud/cloud_tablet_mgr.cpp +++ b/be/src/cloud/cloud_tablet_mgr.cpp @@ -327,6 +327,7 @@ Status CloudTabletMgr::get_topn_tablets_to_compact( if (t == nullptr) { continue; } int64_t s = score(t.get()); + if (s <= 0) { continue; } if (s > *max_score) { max_score_tablet_id = t->tablet_id(); *max_score = s; diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index a263d42ec6c..dec407894ef 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -1117,6 +1117,11 @@ Status CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext& ctx.tablet_schema = _cur_tablet_schema; ctx.newest_write_timestamp = _newest_write_timestamp; ctx.write_type = DataWriteType::TYPE_COMPACTION; + + auto compaction_policy = _tablet->tablet_meta()->compaction_policy(); + ctx.compaction_level = + _engine.cumu_compaction_policy(compaction_policy)->new_compaction_level(_input_rowsets); + _output_rs_writer = DORIS_TRY(_tablet->create_rowset_writer(ctx, _is_vertical)); RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_output_rs_writer->rowset_meta().get())); return Status::OK(); diff --git a/be/src/olap/rowset/rowset_writer_context.h b/be/src/olap/rowset/rowset_writer_context.h index e578a9a09ad..366af8cba4e 100644 --- a/be/src/olap/rowset/rowset_writer_context.h +++ b/be/src/olap/rowset/rowset_writer_context.h @@ -102,6 +102,8 @@ struct RowsetWriterContext { // In semi-structure senario tablet_schema will be updated concurrently, // this lock need to be held when update.Use shared_ptr to avoid delete copy contructor std::shared_ptr<std::mutex> schema_lock; + + int64_t compaction_level = 0; }; } // namespace doris diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index e2a19bce339..e27b6b5b944 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -697,6 +697,23 @@ void MetaServiceImpl::update_tablet(::google::protobuf::RpcController* controlle tablet_meta.set_group_commit_interval_ms(tablet_meta_info.group_commit_interval_ms()); } else if (tablet_meta_info.has_group_commit_data_bytes()) { tablet_meta.set_group_commit_data_bytes(tablet_meta_info.group_commit_data_bytes()); + } else if (tablet_meta_info.has_compaction_policy()) { + tablet_meta.set_compaction_policy(tablet_meta_info.compaction_policy()); + } else if (tablet_meta_info.has_time_series_compaction_goal_size_mbytes()) { + tablet_meta.set_time_series_compaction_goal_size_mbytes( + tablet_meta_info.time_series_compaction_goal_size_mbytes()); + } else if (tablet_meta_info.has_time_series_compaction_file_count_threshold()) { + tablet_meta.set_time_series_compaction_file_count_threshold( + tablet_meta_info.time_series_compaction_file_count_threshold()); + } else if (tablet_meta_info.has_time_series_compaction_time_threshold_seconds()) { + tablet_meta.set_time_series_compaction_time_threshold_seconds( + tablet_meta_info.time_series_compaction_time_threshold_seconds()); + } else if (tablet_meta_info.has_time_series_compaction_empty_rowsets_threshold()) { + tablet_meta.set_time_series_compaction_empty_rowsets_threshold( + tablet_meta_info.time_series_compaction_empty_rowsets_threshold()); + } else if (tablet_meta_info.has_time_series_compaction_level_threshold()) { + tablet_meta.set_time_series_compaction_level_threshold( + tablet_meta_info.time_series_compaction_level_threshold()); } int64_t table_id = tablet_meta.table_id(); int64_t index_id = tablet_meta.index_id(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java index 2a7a2bc2f55..4bc34582fff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java @@ -189,7 +189,12 @@ public class CloudRollupJobV2 extends RollupJobV2 { tbl.isInMemory(), true, tbl.getName(), tbl.getTTLSeconds(), tbl.getEnableUniqueKeyMergeOnWrite(), tbl.storeRowColumn(), - tbl.getBaseSchemaVersion()); + tbl.getBaseSchemaVersion(), tbl.getCompactionPolicy(), + tbl.getTimeSeriesCompactionGoalSizeMbytes(), + tbl.getTimeSeriesCompactionFileCountThreshold(), + tbl.getTimeSeriesCompactionTimeThresholdSeconds(), + tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(), + tbl.getTimeSeriesCompactionLevelThreshold()); requestBuilder.addTabletMetas(builder); } // end for rollupTablets ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java index 4f474d188d4..cdfcf0a5e20 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java @@ -201,7 +201,12 @@ public class CloudSchemaChangeJobV2 extends SchemaChangeJobV2 { tbl.getStoragePolicy(), tbl.isInMemory(), true, tbl.getName(), tbl.getTTLSeconds(), tbl.getEnableUniqueKeyMergeOnWrite(), tbl.storeRowColumn(), - shadowSchemaVersion); + shadowSchemaVersion, tbl.getCompactionPolicy(), + tbl.getTimeSeriesCompactionGoalSizeMbytes(), + tbl.getTimeSeriesCompactionFileCountThreshold(), + tbl.getTimeSeriesCompactionTimeThresholdSeconds(), + tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(), + tbl.getTimeSeriesCompactionLevelThreshold()); requestBuilder.addTabletMetas(builder); } // end for rollupTablets ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 786769bd102..f35968ee22a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -2185,7 +2185,7 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { if (tableProperty != null) { return tableProperty.compactionPolicy(); } - return ""; + return PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY; } public void setTimeSeriesCompactionGoalSizeMbytes(long timeSeriesCompactionGoalSizeMbytes) { @@ -2199,7 +2199,7 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { if (tableProperty != null) { return tableProperty.timeSeriesCompactionGoalSizeMbytes(); } - return null; + return PropertyAnalyzer.TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE; } public void setTimeSeriesCompactionFileCountThreshold(long timeSeriesCompactionFileCountThreshold) { @@ -2213,7 +2213,7 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { if (tableProperty != null) { return tableProperty.timeSeriesCompactionFileCountThreshold(); } - return null; + return PropertyAnalyzer.TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE; } public void setTimeSeriesCompactionTimeThresholdSeconds(long timeSeriesCompactionTimeThresholdSeconds) { @@ -2228,7 +2228,7 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { if (tableProperty != null) { return tableProperty.timeSeriesCompactionTimeThresholdSeconds(); } - return null; + return PropertyAnalyzer.TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE; } public void setTimeSeriesCompactionEmptyRowsetsThreshold(long timeSeriesCompactionEmptyRowsetsThreshold) { @@ -2242,7 +2242,7 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { if (tableProperty != null) { return tableProperty.timeSeriesCompactionEmptyRowsetsThreshold(); } - return null; + return PropertyAnalyzer.TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD_DEFAULT_VALUE; } public void setTimeSeriesCompactionLevelThreshold(long timeSeriesCompactionLevelThreshold) { @@ -2256,7 +2256,7 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { if (tableProperty != null) { return tableProperty.timeSeriesCompactionLevelThreshold(); } - return null; + return PropertyAnalyzer.TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE; } public int getBaseSchemaVersion() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java index 573b394e79f..48602a0a8a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java @@ -92,7 +92,13 @@ public class CloudSchemaChangeHandler extends SchemaChangeHandler { throws UserException { Preconditions.checkState(properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS) || properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES) - || properties.containsKey(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS)); + || properties.containsKey(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS) + || properties.containsKey(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY) + || properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES) + || properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD) + || properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS) + || properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD) + || properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD)); if (properties.size() != 1) { throw new UserException("Can only set one table property at a time"); @@ -149,6 +155,123 @@ public class CloudSchemaChangeHandler extends SchemaChangeHandler { } param.groupCommitDataBytes = groupCommitDataBytes; param.type = UpdatePartitionMetaParam.TabletMetaType.GROUP_COMMIT_DATA_BYTES; + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY)) { + String compactionPolicy = properties.get(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY); + if (compactionPolicy != null + && !compactionPolicy.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY) + && !compactionPolicy.equals(PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY)) { + throw new UserException("Table compaction policy only support for " + + PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY + + " or " + PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY); + } + olapTable.readLock(); + try { + if (compactionPolicy == olapTable.getCompactionPolicy()) { + LOG.info("compactionPolicy:{} is equal with olapTable.getCompactionPolicy():{}", + compactionPolicy, olapTable.getCompactionPolicy()); + return; + } + partitions.addAll(olapTable.getPartitions()); + } finally { + olapTable.readUnlock(); + } + param.compactionPolicy = compactionPolicy; + param.type = UpdatePartitionMetaParam.TabletMetaType.COMPACTION_POLICY; + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)) { + long timeSeriesCompactionGoalSizeMbytes = Long.parseLong(properties.get(PropertyAnalyzer + .PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)); + olapTable.readLock(); + try { + if (timeSeriesCompactionGoalSizeMbytes + == olapTable.getTimeSeriesCompactionGoalSizeMbytes()) { + LOG.info("timeSeriesCompactionGoalSizeMbytes:{} is equal with" + + " olapTable.timeSeriesCompactionGoalSizeMbytes():{}", + timeSeriesCompactionGoalSizeMbytes, + olapTable.getTimeSeriesCompactionGoalSizeMbytes()); + return; + } + partitions.addAll(olapTable.getPartitions()); + } finally { + olapTable.readUnlock(); + } + param.timeSeriesCompactionGoalSizeMbytes = timeSeriesCompactionGoalSizeMbytes; + param.type = UpdatePartitionMetaParam.TabletMetaType.TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES; + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD)) { + long timeSeriesCompactionFileCountThreshold = Long.parseLong(properties.get(PropertyAnalyzer + .PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD)); + olapTable.readLock(); + try { + if (timeSeriesCompactionFileCountThreshold + == olapTable.getTimeSeriesCompactionFileCountThreshold()) { + LOG.info("timeSeriesCompactionFileCountThreshold:{} is equal with" + + " olapTable.getTimeSeriesCompactionFileCountThreshold():{}", + timeSeriesCompactionFileCountThreshold, + olapTable.getTimeSeriesCompactionFileCountThreshold()); + return; + } + partitions.addAll(olapTable.getPartitions()); + } finally { + olapTable.readUnlock(); + } + param.timeSeriesCompactionFileCountThreshold = timeSeriesCompactionFileCountThreshold; + param.type = UpdatePartitionMetaParam.TabletMetaType.TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD; + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)) { + long timeSeriesCompactionTimeThresholdSeconds = Long.parseLong(properties.get(PropertyAnalyzer + .PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)); + olapTable.readLock(); + try { + if (timeSeriesCompactionTimeThresholdSeconds + == olapTable.getTimeSeriesCompactionTimeThresholdSeconds()) { + LOG.info("timeSeriesCompactionTimeThresholdSeconds:{} is equal with" + + " olapTable.getTimeSeriesCompactionTimeThresholdSeconds():{}", + timeSeriesCompactionTimeThresholdSeconds, + olapTable.getTimeSeriesCompactionTimeThresholdSeconds()); + return; + } + partitions.addAll(olapTable.getPartitions()); + } finally { + olapTable.readUnlock(); + } + param.timeSeriesCompactionTimeThresholdSeconds = timeSeriesCompactionTimeThresholdSeconds; + param.type = UpdatePartitionMetaParam.TabletMetaType.TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS; + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)) { + long timeSeriesCompactionEmptyRowsetsThreshold = Long.parseLong(properties.get(PropertyAnalyzer + .PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)); + olapTable.readLock(); + try { + if (timeSeriesCompactionEmptyRowsetsThreshold + == olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold()) { + LOG.info("timeSeriesCompactionEmptyRowsetsThreshold:{} is equal with" + + " olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold():{}", + timeSeriesCompactionEmptyRowsetsThreshold, + olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold()); + return; + } + partitions.addAll(olapTable.getPartitions()); + } finally { + olapTable.readUnlock(); + } + param.timeSeriesCompactionEmptyRowsetsThreshold = timeSeriesCompactionEmptyRowsetsThreshold; + param.type = UpdatePartitionMetaParam.TabletMetaType.TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD; + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD)) { + long timeSeriesCompactionLevelThreshold = Long.parseLong(properties.get(PropertyAnalyzer + .PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD)); + olapTable.readLock(); + try { + if (timeSeriesCompactionLevelThreshold + == olapTable.getTimeSeriesCompactionLevelThreshold()) { + LOG.info("timeSeriesCompactionLevelThreshold:{} is equal with" + + " olapTable.getTimeSeriesCompactionLevelThreshold():{}", + timeSeriesCompactionLevelThreshold, + olapTable.getTimeSeriesCompactionLevelThreshold()); + return; + } + partitions.addAll(olapTable.getPartitions()); + } finally { + olapTable.readUnlock(); + } + param.timeSeriesCompactionLevelThreshold = timeSeriesCompactionLevelThreshold; + param.type = UpdatePartitionMetaParam.TabletMetaType.TIME_SERIES_COMPACTION_LEVEL_THRESHOLD; } else { LOG.warn("invalid properties:{}", properties); throw new UserException("invalid properties"); @@ -173,6 +296,12 @@ public class CloudSchemaChangeHandler extends SchemaChangeHandler { TTL_SECONDS, GROUP_COMMIT_INTERVAL_MS, GROUP_COMMIT_DATA_BYTES, + COMPACTION_POLICY, + TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES, + TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD, + TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS, + TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD, + TIME_SERIES_COMPACTION_LEVEL_THRESHOLD, } TabletMetaType type; @@ -181,6 +310,12 @@ public class CloudSchemaChangeHandler extends SchemaChangeHandler { long ttlSeconds = 0; long groupCommitIntervalMs = 0; long groupCommitDataBytes = 0; + String compactionPolicy; + long timeSeriesCompactionGoalSizeMbytes = 0; + long timeSeriesCompactionFileCountThreshold = 0; + long timeSeriesCompactionTimeThresholdSeconds = 0; + long timeSeriesCompactionEmptyRowsetsThreshold = 0; + long timeSeriesCompactionLevelThreshold = 0; } public void updateCloudPartitionMeta(Database db, @@ -228,6 +363,29 @@ public class CloudSchemaChangeHandler extends SchemaChangeHandler { case GROUP_COMMIT_DATA_BYTES: infoBuilder.setGroupCommitDataBytes(param.groupCommitDataBytes); break; + case COMPACTION_POLICY: + infoBuilder.setCompactionPolicy(param.compactionPolicy); + break; + case TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES: + infoBuilder.setTimeSeriesCompactionGoalSizeMbytes( + param.timeSeriesCompactionGoalSizeMbytes); + break; + case TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD: + infoBuilder.setTimeSeriesCompactionFileCountThreshold( + param.timeSeriesCompactionFileCountThreshold); + break; + case TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS: + infoBuilder.setTimeSeriesCompactionTimeThresholdSeconds( + param.timeSeriesCompactionTimeThresholdSeconds); + break; + case TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD: + infoBuilder.setTimeSeriesCompactionEmptyRowsetsThreshold( + param.timeSeriesCompactionEmptyRowsetsThreshold); + break; + case TIME_SERIES_COMPACTION_LEVEL_THRESHOLD: + infoBuilder.setTimeSeriesCompactionLevelThreshold( + param.timeSeriesCompactionLevelThreshold); + break; default: throw new UserException("Unknown TabletMetaType"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java index 1eaf415b152..42ca4bfe99c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java @@ -155,7 +155,12 @@ public class CloudInternalCatalog extends InternalCatalog { partitionId, tablet, tabletType, schemaHash, keysType, shortKeyColumnCount, bfColumns, tbl.getBfFpp(), indexes, columns, tbl.getDataSortInfo(), tbl.getCompressionType(), storagePolicy, isInMemory, false, tbl.getName(), tbl.getTTLSeconds(), - tbl.getEnableUniqueKeyMergeOnWrite(), tbl.storeRowColumn(), indexMeta.getSchemaVersion()); + tbl.getEnableUniqueKeyMergeOnWrite(), tbl.storeRowColumn(), indexMeta.getSchemaVersion(), + tbl.getCompactionPolicy(), tbl.getTimeSeriesCompactionGoalSizeMbytes(), + tbl.getTimeSeriesCompactionFileCountThreshold(), + tbl.getTimeSeriesCompactionTimeThresholdSeconds(), + tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(), + tbl.getTimeSeriesCompactionLevelThreshold()); requestBuilder.addTabletMetas(builder); } if (!storageVaultIdSet && ((CloudEnv) Env.getCurrentEnv()).getEnableStorageVault()) { @@ -198,7 +203,10 @@ public class CloudInternalCatalog extends InternalCatalog { List<Column> schemaColumns, DataSortInfo dataSortInfo, TCompressionType compressionType, String storagePolicy, boolean isInMemory, boolean isShadow, String tableName, long ttlSeconds, boolean enableUniqueKeyMergeOnWrite, - boolean storeRowColumn, int schemaVersion) throws DdlException { + boolean storeRowColumn, int schemaVersion, String compactionPolicy, + Long timeSeriesCompactionGoalSizeMbytes, Long timeSeriesCompactionFileCountThreshold, + Long timeSeriesCompactionTimeThresholdSeconds, Long timeSeriesCompactionEmptyRowsetsThreshold, + Long timeSeriesCompactionLevelThreshold) throws DdlException { OlapFile.TabletMetaCloudPB.Builder builder = OlapFile.TabletMetaCloudPB.newBuilder(); builder.setTableId(tableId); builder.setIndexId(indexId); @@ -227,6 +235,13 @@ public class CloudInternalCatalog extends InternalCatalog { builder.setReplicaId(tablet.getReplicas().get(0).getId()); builder.setEnableUniqueKeyMergeOnWrite(enableUniqueKeyMergeOnWrite); + builder.setCompactionPolicy(compactionPolicy); + builder.setTimeSeriesCompactionGoalSizeMbytes(timeSeriesCompactionGoalSizeMbytes); + builder.setTimeSeriesCompactionFileCountThreshold(timeSeriesCompactionFileCountThreshold); + builder.setTimeSeriesCompactionTimeThresholdSeconds(timeSeriesCompactionTimeThresholdSeconds); + builder.setTimeSeriesCompactionEmptyRowsetsThreshold(timeSeriesCompactionEmptyRowsetsThreshold); + builder.setTimeSeriesCompactionLevelThreshold(timeSeriesCompactionLevelThreshold); + OlapFile.TabletSchemaCloudPB.Builder schemaBuilder = OlapFile.TabletSchemaCloudPB.newBuilder(); schemaBuilder.setSchemaVersion(schemaVersion); diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 3054b9245f5..b856d50206b 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -475,6 +475,12 @@ message TabletMetaInfoPB { // For update tablet meta optional int64 ttl_seconds = 4; optional int64 group_commit_interval_ms = 5; optional int64 group_commit_data_bytes = 6; + optional string compaction_policy = 7; + optional int64 time_series_compaction_goal_size_mbytes = 8; + optional int64 time_series_compaction_file_count_threshold = 9; + optional int64 time_series_compaction_time_threshold_seconds = 10; + optional int64 time_series_compaction_empty_rowsets_threshold = 11; + optional int64 time_series_compaction_level_threshold = 12; } message TabletCompactionJobPB { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
