This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 54282029a0fb170106c92784a7d5dd22be151c8a Author: Chenyang Sun <[email protected]> AuthorDate: Tue Aug 1 22:02:23 2023 +0800 [improvement](compaction) compaction policy and options in the properties of a table (#22461) --- be/src/agent/task_worker_pool.cpp | 41 ++++ be/src/common/config.cpp | 14 -- be/src/common/config.h | 11 -- be/src/http/action/compaction_action.cpp | 4 +- be/src/olap/compaction.cpp | 6 +- be/src/olap/cumulative_compaction_policy.cpp | 7 +- be/src/olap/cumulative_compaction_policy.h | 5 +- .../cumulative_compaction_time_series_policy.cpp | 27 +-- .../cumulative_compaction_time_series_policy.h | 8 +- be/src/olap/olap_server.cpp | 22 ++- be/src/olap/storage_engine.h | 4 +- be/src/olap/tablet.cpp | 5 +- be/src/olap/tablet_manager.cpp | 7 +- be/src/olap/tablet_manager.h | 3 +- be/src/olap/tablet_meta.cpp | 49 ++++- be/src/olap/tablet_meta.h | 35 +++- be/test/olap/cumulative_compaction_policy_test.cpp | 6 +- ...mulative_compaction_time_series_policy_test.cpp | 28 +-- docs/en/docs/admin-manual/config/be-config.md | 27 --- .../Create/CREATE-TABLE.md | 31 +++ docs/zh-CN/docs/admin-manual/config/be-config.md | 27 --- .../Create/CREATE-TABLE.md | 30 +++ .../main/java/org/apache/doris/alter/Alter.java | 30 ++- .../org/apache/doris/alter/AlterOperations.java | 54 ++++++ .../java/org/apache/doris/alter/RollupJobV2.java | 4 + .../apache/doris/alter/SchemaChangeHandler.java | 119 +++++++++++- .../org/apache/doris/alter/SchemaChangeJobV2.java | 4 + .../analysis/ModifyTablePropertiesClause.java | 99 ++++++++++ .../java/org/apache/doris/backup/RestoreJob.java | 4 + .../main/java/org/apache/doris/catalog/Env.java | 31 +++ .../java/org/apache/doris/catalog/OlapTable.java | 56 ++++++ .../org/apache/doris/catalog/TableProperty.java | 60 ++++++ .../apache/doris/common/util/PropertyAnalyzer.java | 108 +++++++++++ .../apache/doris/datasource/InternalCatalog.java | 89 ++++++++- .../org/apache/doris/master/ReportHandler.java | 5 +- .../org/apache/doris/task/CreateReplicaTask.java | 21 ++- .../doris/task/UpdateTabletMetaInfoTask.java | 40 ++++ .../java/org/apache/doris/task/AgentTaskTest.java | 2 +- gensrc/proto/olap_file.proto | 4 + gensrc/thrift/AgentService.thrift | 8 + .../test_table_level_compaction_policy.groovy | 207 +++++++++++++++++++++ 41 files changed, 1198 insertions(+), 144 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 1c44f30548..8d90c8b1b0 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -433,6 +433,47 @@ void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() { tablet->tablet_schema_unlocked()->set_is_in_memory(tablet_meta_info.is_in_memory); need_to_save = true; } + if (tablet_meta_info.__isset.compaction_policy) { + if (tablet_meta_info.compaction_policy != "size_based" && + tablet_meta_info.compaction_policy != "time_series") { + status = Status::InvalidArgument( + "invalid compaction policy, only support for size_based or " + "time_series"); + continue; + } + tablet->tablet_meta()->set_compaction_policy(tablet_meta_info.compaction_policy); + need_to_save = true; + } + if (tablet_meta_info.__isset.time_series_compaction_goal_size_mbytes) { + if (tablet->tablet_meta()->compaction_policy() != "time_series") { + status = Status::InvalidArgument( + "only time series compaction policy support time series config"); + continue; + } + tablet->tablet_meta()->set_time_series_compaction_goal_size_mbytes( + tablet_meta_info.time_series_compaction_goal_size_mbytes); + need_to_save = true; + } + if (tablet_meta_info.__isset.time_series_compaction_file_count_threshold) { + if (tablet->tablet_meta()->compaction_policy() != "time_series") { + status = Status::InvalidArgument( + "only time series compaction policy support time series config"); + continue; + } + tablet->tablet_meta()->set_time_series_compaction_file_count_threshold( + tablet_meta_info.time_series_compaction_file_count_threshold); + need_to_save = true; + } + if (tablet_meta_info.__isset.time_series_compaction_time_threshold_seconds) { + if (tablet->tablet_meta()->compaction_policy() != "time_series") { + status = Status::InvalidArgument( + "only time series compaction policy support time series config"); + continue; + } + tablet->tablet_meta()->set_time_series_compaction_time_threshold_seconds( + tablet_meta_info.time_series_compaction_time_threshold_seconds); + need_to_save = true; + } if (tablet_meta_info.__isset.replica_id) { tablet->tablet_meta()->set_replica_id(tablet_meta_info.replica_id); } diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 87d1a6ccc5..912fae1b5c 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -973,20 +973,6 @@ DEFINE_Int32(num_broadcast_buffer, "32"); // semi-structure configs DEFINE_Bool(enable_parse_multi_dimession_array, "false"); -// Currently, two compaction strategies are implemented, SIZE_BASED and TIME_SERIES. -// In the case of time series compaction, the execution of compaction is adjusted -// using parameters that have the prefix time_series_compaction. -DEFINE_mString(compaction_policy, "size_based"); -DEFINE_Validator(compaction_policy, [](const std::string config) -> bool { - return config == "size_based" || config == "time_series"; -}); -// the size of input files for each compaction -DEFINE_mInt64(time_series_compaction_goal_size_mbytes, "512"); -// the minimum number of input files for each compaction if time_series_compaction_goal_size_mbytes not meets -DEFINE_mInt64(time_series_compaction_file_count_threshold, "2000"); -// if compaction has not been performed within 3600 seconds, a compaction will be triggered -DEFINE_mInt64(time_series_compaction_time_threshold_seconds, "3600"); - // max depth of expression tree allowed. DEFINE_Int32(max_depth_of_expr_tree, "600"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 0fd5c648d7..c38badffe7 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1005,17 +1005,6 @@ DECLARE_Int32(num_broadcast_buffer); // semi-structure configs DECLARE_Bool(enable_parse_multi_dimession_array); -// Currently, two compaction strategies are implemented, SIZE_BASED and TIME_SERIES. -// In the case of time series compaction, the execution of compaction is adjusted -// using parameters that have the prefix time_series_compaction. -DECLARE_mString(compaction_policy); -// the size of input files for each compaction -DECLARE_mInt64(time_series_compaction_goal_size_mbytes); -// the minimum number of input files for each compaction if time_series_compaction_goal_size_mbytes not meets -DECLARE_mInt64(time_series_compaction_file_count_threshold); -// if compaction has not been performed within 3600 seconds, a compaction will be triggered -DECLARE_mInt64(time_series_compaction_time_threshold_seconds); - // max depth of expression tree allowed. DECLARE_Int32(max_depth_of_expr_tree); diff --git a/be/src/http/action/compaction_action.cpp b/be/src/http/action/compaction_action.cpp index 81f6ad4d30..d94ca60432 100644 --- a/be/src/http/action/compaction_action.cpp +++ b/be/src/http/action/compaction_action.cpp @@ -37,6 +37,7 @@ #include "olap/base_compaction.h" #include "olap/cumulative_compaction.h" #include "olap/cumulative_compaction_policy.h" +#include "olap/cumulative_compaction_time_series_policy.h" #include "olap/full_compaction.h" #include "olap/olap_define.h" #include "olap/storage_engine.h" @@ -198,7 +199,8 @@ Status CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet, timer.start(); std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy = - CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(); + CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy( + tablet->tablet_meta()->compaction_policy()); if (tablet->get_cumulative_compaction_policy() == nullptr) { tablet->set_cumulative_compaction_policy(cumulative_compaction_policy); } diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index d67248919f..2f24fbd071 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -148,8 +148,10 @@ int64_t Compaction::get_avg_segment_rows() { // input_rowsets_size is total disk_size of input_rowset, this size is the // final size after codec and compress, so expect dest segment file size // in disk is config::vertical_compaction_max_segment_size - if (config::compaction_policy == CUMULATIVE_TIME_SERIES_POLICY) { - return (config::time_series_compaction_goal_size_mbytes * 1024 * 1024 * 2) / + const auto& meta = _tablet->tablet_meta(); + if (meta->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY) { + int64_t compaction_goal_size_mbytes = meta->time_series_compaction_goal_size_mbytes(); + return (compaction_goal_size_mbytes * 1024 * 1024 * 2) / (_input_rowsets_size / (_input_row_num + 1) + 1); } return config::vertical_compaction_max_segment_size / diff --git a/be/src/olap/cumulative_compaction_policy.cpp b/be/src/olap/cumulative_compaction_policy.cpp index 17773e831e..c3d5047cec 100644 --- a/be/src/olap/cumulative_compaction_policy.cpp +++ b/be/src/olap/cumulative_compaction_policy.cpp @@ -368,9 +368,12 @@ int64_t SizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size) { } std::shared_ptr<CumulativeCompactionPolicy> -CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy() { - if (config::compaction_policy == CUMULATIVE_TIME_SERIES_POLICY) { +CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy( + const std::string_view& compaction_policy) { + if (compaction_policy == CUMULATIVE_TIME_SERIES_POLICY) { return std::make_shared<TimeSeriesCumulativeCompactionPolicy>(); + } else if (compaction_policy == CUMULATIVE_SIZE_BASED_POLICY) { + return std::make_shared<SizeBasedCumulativeCompactionPolicy>(); } return std::make_shared<SizeBasedCumulativeCompactionPolicy>(); } diff --git a/be/src/olap/cumulative_compaction_policy.h b/be/src/olap/cumulative_compaction_policy.h index 65f970e4af..836100903b 100644 --- a/be/src/olap/cumulative_compaction_policy.h +++ b/be/src/olap/cumulative_compaction_policy.h @@ -33,7 +33,7 @@ namespace doris { class Tablet; struct Version; -inline std::string_view CUMULATIVE_SIZE_BASED_POLICY = "size_based"; +inline constexpr std::string_view CUMULATIVE_SIZE_BASED_POLICY = "size_based"; /// This class CumulativeCompactionPolicy is the base class of cumulative compaction policy. /// It defines the policy to do cumulative compaction. It has different derived classes, which implements @@ -176,7 +176,8 @@ class CumulativeCompactionPolicyFactory { public: /// Static factory function. It can product different policy according to the `policy` parameter and use tablet ptr /// to construct the policy. Now it can product size based and num based policies. - static std::shared_ptr<CumulativeCompactionPolicy> create_cumulative_compaction_policy(); + static std::shared_ptr<CumulativeCompactionPolicy> create_cumulative_compaction_policy( + const std::string_view& compaction_policy); }; } // namespace doris diff --git a/be/src/olap/cumulative_compaction_time_series_policy.cpp b/be/src/olap/cumulative_compaction_time_series_policy.cpp index ac816d0e54..1e144b6a2c 100644 --- a/be/src/olap/cumulative_compaction_time_series_policy.cpp +++ b/be/src/olap/cumulative_compaction_time_series_policy.cpp @@ -63,13 +63,15 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score( return 0; } - // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size - if (total_size >= (config::time_series_compaction_goal_size_mbytes * 1024 * 1024)) { + // Condition 1: the size of input files for compaction meets the requirement of parameter compaction_goal_size + int64_t compaction_goal_size_mbytes = + tablet->tablet_meta()->time_series_compaction_goal_size_mbytes(); + if (total_size >= compaction_goal_size_mbytes * 1024 * 1024) { return score; } - // Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold - if (score >= config::time_series_compaction_file_count_threshold) { + // Condition 2: the number of input files reaches the threshold specified by parameter compaction_file_count_threshold + if (score >= tablet->tablet_meta()->time_series_compaction_file_count_threshold()) { return score; } @@ -79,7 +81,8 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score( int64_t cumu_interval = now - last_cumu; // Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second - if (cumu_interval > (config::time_series_compaction_time_threshold_seconds * 1000)) { + if (cumu_interval > + (tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() * 1000)) { return score; } } else if (score > 0) { @@ -199,8 +202,9 @@ int TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets( transient_size += 1; input_rowsets->push_back(rowset); - // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size - if (total_size >= (config::time_series_compaction_goal_size_mbytes * 1024 * 1024)) { + // Condition 1: the size of input files for compaction meets the requirement of parameter compaction_goal_size + if (total_size >= + (tablet->tablet_meta()->time_series_compaction_goal_size_mbytes() * 1024 * 1024)) { if (input_rowsets->size() == 1 && !input_rowsets->front()->rowset_meta()->is_segments_overlapping()) { // Only 1 non-overlapping rowset, skip it @@ -225,8 +229,8 @@ int TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets( return transient_size; } - // Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold - if (*compaction_score >= config::time_series_compaction_file_count_threshold) { + // Condition 2: the number of input files reaches the threshold specified by parameter compaction_file_count_threshold + if (*compaction_score >= tablet->tablet_meta()->time_series_compaction_file_count_threshold()) { return transient_size; } @@ -235,8 +239,9 @@ int TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets( if (last_cumu != 0) { int64_t cumu_interval = now - last_cumu; - // Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second - if (cumu_interval > (config::time_series_compaction_time_threshold_seconds * 1000)) { + // Condition 3: the time interval between compactions exceeds the value specified by parameter compaction_time_threshold_second + if (cumu_interval > + (tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() * 1000)) { return transient_size; } } diff --git a/be/src/olap/cumulative_compaction_time_series_policy.h b/be/src/olap/cumulative_compaction_time_series_policy.h index 3fc3362fa8..1bae5ecfb1 100644 --- a/be/src/olap/cumulative_compaction_time_series_policy.h +++ b/be/src/olap/cumulative_compaction_time_series_policy.h @@ -21,13 +21,13 @@ namespace doris { -inline std::string_view CUMULATIVE_TIME_SERIES_POLICY = "time_series"; +inline constexpr std::string_view CUMULATIVE_TIME_SERIES_POLICY = "time_series"; /// TimeSeries cumulative compaction policy implementation. /// The following three conditions will be considered when calculating compaction scores and selecting input rowsets in this policy: -/// Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size -/// Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold -/// Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_seconds +/// Condition 1: the size of input files for compaction meets the requirement of time_series_compaction_goal_size_mbytes +/// Condition 2: the number of input files reaches the threshold specified by time_series_compaction_file_count_threshold +/// Condition 3: the time interval between compactions exceeds the value specified by time_series_compaction_time_threshold_seconds /// The conditions are evaluated sequentially, starting with Condition 1. /// If any condition is met, the compaction score calculation or selection of input rowsets will be successful. class TimeSeriesCumulativeCompactionPolicy final : public CumulativeCompactionPolicy { diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index dab16000c0..705d803929 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -52,6 +52,7 @@ #include "olap/cold_data_compaction.h" #include "olap/compaction_permit_limiter.h" #include "olap/cumulative_compaction_policy.h" +#include "olap/cumulative_compaction_time_series_policy.h" #include "olap/data_dir.h" #include "olap/olap_common.h" #include "olap/rowset/beta_rowset_writer.h" @@ -834,7 +835,7 @@ std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks( compaction_type == CompactionType::CUMULATIVE_COMPACTION ? copied_cumu_map[data_dir] : copied_base_map[data_dir], - &disk_max_score, _cumulative_compaction_policy); + &disk_max_score, _cumulative_compaction_policies); if (tablet != nullptr) { if (!tablet->tablet_meta()->tablet_schema()->disable_auto_compaction()) { if (need_pick_tablet) { @@ -864,9 +865,13 @@ std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks( } void StorageEngine::_update_cumulative_compaction_policy() { - if (_cumulative_compaction_policy == nullptr) { - _cumulative_compaction_policy = - CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(); + if (_cumulative_compaction_policies.empty()) { + _cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY] = + CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy( + CUMULATIVE_SIZE_BASED_POLICY); + _cumulative_compaction_policies[CUMULATIVE_TIME_SERIES_POLICY] = + CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy( + CUMULATIVE_TIME_SERIES_POLICY); } } @@ -976,8 +981,13 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type, bool force) { _update_cumulative_compaction_policy(); - if (tablet->get_cumulative_compaction_policy() == nullptr) { - tablet->set_cumulative_compaction_policy(_cumulative_compaction_policy); + // alter table tableName set ("compaction_policy"="time_series") + // if atler table's compaction policy, we need to modify tablet compaction policy shared ptr + if (tablet->get_cumulative_compaction_policy() == nullptr || + tablet->get_cumulative_compaction_policy()->name() != + tablet->tablet_meta()->compaction_policy()) { + tablet->set_cumulative_compaction_policy( + _cumulative_compaction_policies.at(tablet->tablet_meta()->compaction_policy())); } tablet->set_skip_compaction(false); return _submit_compaction_task(tablet, compaction_type, force); diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 585f26902a..25f565f031 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -460,7 +460,9 @@ private: std::shared_ptr<StreamLoadRecorder> _stream_load_recorder; - std::shared_ptr<CumulativeCompactionPolicy> _cumulative_compaction_policy; + // we use unordered_map to store all cumulative compaction policy sharded ptr + std::unordered_map<std::string_view, std::shared_ptr<CumulativeCompactionPolicy>> + _cumulative_compaction_policies; scoped_refptr<Thread> _cooldown_tasks_producer_thread; scoped_refptr<Thread> _remove_unused_remote_files_thread; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 19c8a8467e..13a887c429 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -284,7 +284,8 @@ Status Tablet::_init_once_action() { #ifdef BE_TEST // init cumulative compaction policy by type _cumulative_compaction_policy = - CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(); + CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy( + _tablet_meta->compaction_policy()); #endif RowsetVector rowset_vec; @@ -1062,7 +1063,7 @@ uint32_t Tablet::_calc_base_compaction_score() const { // In the time series compaction policy, we want the base compaction to be triggered // when there are delete versions present. - if (config::compaction_policy == CUMULATIVE_TIME_SERIES_POLICY) { + if (_tablet_meta->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY) { return (base_rowset_exist && has_delete) ? score : 0; } diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 8b68e62406..36d19cf8ed 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -40,6 +40,7 @@ #include "gutil/strings/strcat.h" #include "gutil/strings/substitute.h" #include "io/fs/local_file_system.h" +#include "olap/cumulative_compaction_time_series_policy.h" #include "olap/data_dir.h" #include "olap/olap_common.h" #include "olap/olap_define.h" @@ -678,7 +679,8 @@ void TabletManager::get_tablet_stat(TTabletStatResult* result) { TabletSharedPtr TabletManager::find_best_tablet_to_compaction( CompactionType compaction_type, DataDir* data_dir, const std::unordered_set<TTabletId>& tablet_submitted_compaction, uint32_t* score, - std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy) { + const std::unordered_map<std::string_view, std::shared_ptr<CumulativeCompactionPolicy>>& + all_cumulative_compaction_policies) { int64_t now_ms = UnixMillis(); const string& compaction_type_str = compaction_type == CompactionType::BASE_COMPACTION ? "base" : "cumulative"; @@ -729,7 +731,8 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction( continue; } } - + auto cumulative_compaction_policy = all_cumulative_compaction_policies.at( + tablet_ptr->tablet_meta()->compaction_policy()); uint32_t current_compaction_score = tablet_ptr->calc_compaction_score( compaction_type, cumulative_compaction_policy); if (current_compaction_score < 5) { diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h index f7f41dcef1..2371da868a 100644 --- a/be/src/olap/tablet_manager.h +++ b/be/src/olap/tablet_manager.h @@ -77,7 +77,8 @@ public: TabletSharedPtr find_best_tablet_to_compaction( CompactionType compaction_type, DataDir* data_dir, const std::unordered_set<TTabletId>& tablet_submitted_compaction, uint32_t* score, - std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy); + const std::unordered_map<std::string_view, std::shared_ptr<CumulativeCompactionPolicy>>& + all_cumulative_compaction_policies); TabletSharedPtr get_tablet(TTabletId tablet_id, bool include_deleted = false, std::string* err = nullptr); diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 01d206ebe9..612c50280d 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -65,7 +65,10 @@ Status TabletMeta::create(const TCreateTabletReq& request, const TabletUid& tabl request.__isset.enable_unique_key_merge_on_write ? request.enable_unique_key_merge_on_write : false, - std::move(binlog_config)); + std::move(binlog_config), request.compaction_policy, + request.time_series_compaction_goal_size_mbytes, + request.time_series_compaction_file_count_threshold, + request.time_series_compaction_time_threshold_seconds); return Status::OK(); } @@ -81,7 +84,10 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id TabletUid tablet_uid, TTabletType::type tabletType, TCompressionType::type compression_type, int64_t storage_policy_id, bool enable_unique_key_merge_on_write, - std::optional<TBinlogConfig> binlog_config) + std::optional<TBinlogConfig> binlog_config, std::string compaction_policy, + int64_t time_series_compaction_goal_size_mbytes, + int64_t time_series_compaction_file_count_threshold, + int64_t time_series_compaction_time_threshold_seconds) : _tablet_uid(0, 0), _schema(new TabletSchema), _delete_bitmap(new DeleteBitmap(tablet_id)) { @@ -102,6 +108,13 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id : TabletTypePB::TABLET_TYPE_MEMORY); tablet_meta_pb.set_enable_unique_key_merge_on_write(enable_unique_key_merge_on_write); tablet_meta_pb.set_storage_policy_id(storage_policy_id); + tablet_meta_pb.set_compaction_policy(compaction_policy); + tablet_meta_pb.set_time_series_compaction_goal_size_mbytes( + time_series_compaction_goal_size_mbytes); + tablet_meta_pb.set_time_series_compaction_file_count_threshold( + time_series_compaction_file_count_threshold); + tablet_meta_pb.set_time_series_compaction_time_threshold_seconds( + time_series_compaction_time_threshold_seconds); TabletSchemaPB* schema = tablet_meta_pb.mutable_schema(); schema->set_num_short_key_columns(tablet_schema.short_key_column_count); schema->set_num_rows_per_row_block(config::default_num_rows_per_column_file_block); @@ -266,7 +279,6 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id if (tablet_schema.__isset.skip_write_index_on_load) { schema->set_skip_write_index_on_load(tablet_schema.skip_write_index_on_load); } - if (binlog_config.has_value()) { BinlogConfig tmp_binlog_config; tmp_binlog_config = binlog_config.value(); @@ -298,7 +310,13 @@ TabletMeta::TabletMeta(const TabletMeta& b) _cooldown_meta_id(b._cooldown_meta_id), _enable_unique_key_merge_on_write(b._enable_unique_key_merge_on_write), _delete_bitmap(b._delete_bitmap), - _binlog_config(b._binlog_config) {}; + _binlog_config(b._binlog_config), + _compaction_policy(b._compaction_policy), + _time_series_compaction_goal_size_mbytes(b._time_series_compaction_goal_size_mbytes), + _time_series_compaction_file_count_threshold( + b._time_series_compaction_file_count_threshold), + _time_series_compaction_time_threshold_seconds( + b._time_series_compaction_time_threshold_seconds) {}; void TabletMeta::init_column_from_tcolumn(uint32_t unique_id, const TColumn& tcolumn, ColumnPB* column) { @@ -562,6 +580,13 @@ void TabletMeta::init_from_pb(const TabletMetaPB& tablet_meta_pb) { if (tablet_meta_pb.has_binlog_config()) { _binlog_config = tablet_meta_pb.binlog_config(); } + _compaction_policy = tablet_meta_pb.compaction_policy(); + _time_series_compaction_goal_size_mbytes = + tablet_meta_pb.time_series_compaction_goal_size_mbytes(); + _time_series_compaction_file_count_threshold = + tablet_meta_pb.time_series_compaction_file_count_threshold(); + _time_series_compaction_time_threshold_seconds = + tablet_meta_pb.time_series_compaction_time_threshold_seconds(); } void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) { @@ -636,6 +661,13 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) { } } _binlog_config.to_pb(tablet_meta_pb->mutable_binlog_config()); + tablet_meta_pb->set_compaction_policy(compaction_policy()); + tablet_meta_pb->set_time_series_compaction_goal_size_mbytes( + time_series_compaction_goal_size_mbytes()); + tablet_meta_pb->set_time_series_compaction_file_count_threshold( + time_series_compaction_file_count_threshold()); + tablet_meta_pb->set_time_series_compaction_time_threshold_seconds( + time_series_compaction_time_threshold_seconds()); } uint32_t TabletMeta::mem_size() const { @@ -860,6 +892,15 @@ bool operator==(const TabletMeta& a, const TabletMeta& b) { if (a._in_restore_mode != b._in_restore_mode) return false; if (a._preferred_rowset_type != b._preferred_rowset_type) return false; if (a._storage_policy_id != b._storage_policy_id) return false; + if (a._compaction_policy != b._compaction_policy) return false; + if (a._time_series_compaction_goal_size_mbytes != b._time_series_compaction_goal_size_mbytes) + return false; + if (a._time_series_compaction_file_count_threshold != + b._time_series_compaction_file_count_threshold) + return false; + if (a._time_series_compaction_time_threshold_seconds != + b._time_series_compaction_time_threshold_seconds) + return false; return true; } diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 70830b82de..c9ddff2417 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -106,7 +106,11 @@ public: TabletUid tablet_uid, TTabletType::type tabletType, TCompressionType::type compression_type, int64_t storage_policy_id = 0, bool enable_unique_key_merge_on_write = false, - std::optional<TBinlogConfig> binlog_config = {}); + std::optional<TBinlogConfig> binlog_config = {}, + std::string compaction_policy = "size_based", + int64_t time_series_compaction_goal_size_mbytes = 1024, + int64_t time_series_compaction_file_count_threshold = 2000, + int64_t time_series_compaction_time_threshold_seconds = 3600); // If need add a filed in TableMeta, filed init copy in copy construct function TabletMeta(const TabletMeta& tablet_meta); TabletMeta(TabletMeta&& tablet_meta) = delete; @@ -228,6 +232,29 @@ public: _binlog_config = std::move(binlog_config); } + void set_compaction_policy(std::string compaction_policy) { + _compaction_policy = compaction_policy; + } + std::string compaction_policy() const { return _compaction_policy; } + void set_time_series_compaction_goal_size_mbytes(int64_t goal_size_mbytes) { + _time_series_compaction_goal_size_mbytes = goal_size_mbytes; + } + int64_t time_series_compaction_goal_size_mbytes() const { + return _time_series_compaction_goal_size_mbytes; + } + void set_time_series_compaction_file_count_threshold(int64_t file_count_threshold) { + _time_series_compaction_file_count_threshold = file_count_threshold; + } + int64_t time_series_compaction_file_count_threshold() const { + return _time_series_compaction_file_count_threshold; + } + void set_time_series_compaction_time_threshold_seconds(int64_t time_threshold) { + _time_series_compaction_time_threshold_seconds = time_threshold; + } + int64_t time_series_compaction_time_threshold_seconds() const { + return _time_series_compaction_time_threshold_seconds; + } + private: Status _save_meta(DataDir* data_dir); @@ -274,6 +301,12 @@ private: // binlog config BinlogConfig _binlog_config {}; + // meta for compaction + std::string _compaction_policy; + int64_t _time_series_compaction_goal_size_mbytes = 0; + int64_t _time_series_compaction_file_count_threshold = 0; + int64_t _time_series_compaction_time_threshold_seconds = 0; + mutable std::shared_mutex _meta_lock; }; diff --git a/be/test/olap/cumulative_compaction_policy_test.cpp b/be/test/olap/cumulative_compaction_policy_test.cpp index 83b779ceb1..dc14907e3e 100644 --- a/be/test/olap/cumulative_compaction_policy_test.cpp +++ b/be/test/olap/cumulative_compaction_policy_test.cpp @@ -340,7 +340,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, calc_cumulative_compaction_score _tablet->calculate_cumulative_point(); std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy = - CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(); + CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy( + CUMULATIVE_SIZE_BASED_POLICY); const uint32_t score = _tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION, cumulative_compaction_policy); @@ -359,7 +360,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, calc_cumulative_compaction_score _tablet->init(); _tablet->calculate_cumulative_point(); std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy = - CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(); + CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy( + CUMULATIVE_SIZE_BASED_POLICY); const uint32_t score = _tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION, cumulative_compaction_policy); diff --git a/be/test/olap/cumulative_compaction_time_series_policy_test.cpp b/be/test/olap/cumulative_compaction_time_series_policy_test.cpp index 74bcbe70ac..f9b9117233 100644 --- a/be/test/olap/cumulative_compaction_time_series_policy_test.cpp +++ b/be/test/olap/cumulative_compaction_time_series_policy_test.cpp @@ -35,17 +35,17 @@ namespace doris { class TestTimeSeriesCumulativeCompactionPolicy : public testing::Test { public: - TestTimeSeriesCumulativeCompactionPolicy() {} + TestTimeSeriesCumulativeCompactionPolicy() = default; void SetUp() { - config::compaction_policy = "time_series"; - config::time_series_compaction_goal_size_mbytes = 1024; - config::time_series_compaction_file_count_threshold = 10; - config::time_series_compaction_time_threshold_seconds = 3600; - _tablet_meta = static_cast<TabletMetaSharedPtr>(new TabletMeta( 1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F)); + _tablet_meta->set_compaction_policy(std::string(CUMULATIVE_TIME_SERIES_POLICY)); + _tablet_meta->set_time_series_compaction_goal_size_mbytes(100); + _tablet_meta->set_time_series_compaction_file_count_threshold(10); + _tablet_meta->set_time_series_compaction_time_threshold_seconds(3600); + _json_rowset_meta = R"({ "rowset_id": 540081, "tablet_id": 15673, @@ -106,28 +106,28 @@ public: void init_rs_meta_big_rowset(std::vector<RowsetMetaSharedPtr>* rs_metas) { RowsetMetaSharedPtr ptr1(new RowsetMeta()); init_rs_meta(ptr1, 0, 1); - ptr1->set_total_disk_size(1024 * 1024 * 1024); + ptr1->set_total_disk_size(100 * 1024 * 1024); rs_metas->push_back(ptr1); RowsetMetaSharedPtr ptr2(new RowsetMeta()); init_rs_meta(ptr2, 2, 3); - ptr2->set_total_disk_size(1024 * 1024 * 1024); + ptr2->set_total_disk_size(100 * 1024 * 1024); rs_metas->push_back(ptr2); RowsetMetaSharedPtr ptr3(new RowsetMeta()); init_rs_meta(ptr3, 4, 4); - ptr3->set_total_disk_size(512 * 1024 * 1024); + ptr3->set_total_disk_size(51 * 1024 * 1024); rs_metas->push_back(ptr3); RowsetMetaSharedPtr ptr4(new RowsetMeta()); init_rs_meta(ptr4, 5, 5); ptr4->set_segments_overlap(OVERLAPPING); - ptr4->set_total_disk_size(512 * 1024 * 1024); + ptr4->set_total_disk_size(51 * 1024 * 1024); rs_metas->push_back(ptr4); RowsetMetaSharedPtr ptr5(new RowsetMeta()); init_rs_meta(ptr5, 6, 6); - ptr5->set_total_disk_size(512 * 1024 * 1024); + ptr5->set_total_disk_size(51 * 1024 * 1024); rs_metas->push_back(ptr5); } @@ -336,7 +336,8 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy, calc_cumulative_compaction_scor _tablet->calculate_cumulative_point(); std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy = - CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(); + CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy( + CUMULATIVE_TIME_SERIES_POLICY); const uint32_t score = _tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION, cumulative_compaction_policy); @@ -355,7 +356,8 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy, calc_cumulative_compaction_scor _tablet->init(); _tablet->calculate_cumulative_point(); std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy = - CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(); + CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy( + CUMULATIVE_TIME_SERIES_POLICY); const uint32_t score = _tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION, cumulative_compaction_policy); diff --git a/docs/en/docs/admin-manual/config/be-config.md b/docs/en/docs/admin-manual/config/be-config.md index 1457e935d4..1765d1ca88 100644 --- a/docs/en/docs/admin-manual/config/be-config.md +++ b/docs/en/docs/admin-manual/config/be-config.md @@ -659,33 +659,6 @@ BaseCompaction:546859: * Description: Minimal interval (s) to update peer replica infos * Default value: 60 (s) -#### `compaction_policy` - -* Type: string -* Description: Configure the compaction strategy in the compression phase. Currently, two compaction strategies are implemented, size_based and time_series. - - size_based: Version merging can only be performed when the disk volume of the rowset is the same order of magnitude. After merging, qualified rowsets are promoted to the base compaction stage. In the case of a large number of small batch imports, it can reduce the write magnification of base compact, balance the read magnification and space magnification, and reduce the data of file versions. - - time_series: When the disk size of a rowset accumulates to a certain threshold, version merging takes place. The merged rowset is directly promoted to the base compaction stage. This approach effectively reduces the write amplification rate of compaction, especially in scenarios with continuous imports in a time series context. -* Default value: size_based - -#### `time_series_compaction_goal_size_mbytes` - -* Type: int64 -* Description: Enabling time series compaction will utilize this parameter to adjust the size of input files for each compaction. The output file size will be approximately equal to the input file size. -* Default value: 512 - -#### `time_series_compaction_file_count_threshold` - -* Type: int64 -* Description: Enabling time series compaction will utilize this parameter to adjust the minimum number of input files for each compaction. It comes into effect only when the condition specified by time_series_compaction_goal_size_mbytes is not met. - - If the number of files in a tablet exceeds the configured threshold, it will trigger a compaction process. -* Default value: 2000 - -#### `time_series_compaction_time_threshold_seconds` - -* Type: int64 -* Description: When time series compaction is enabled, a significant duration passes without a compaction being executed, a compaction will be triggered. -* Default value: 3600 (s) - ### Load diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md index ab9e3a6f0f..2727e2f734 100644 --- a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md +++ b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md @@ -409,6 +409,37 @@ Set table properties. The following attributes are currently supported: `"skip_write_index_on_load" = "false"` +* `compaction_policy` + + Configure the compaction strategy in the compression phase. Only support configuring the compaction policy as "time_series" or "size_based". + + time_series: When the disk size of a rowset accumulates to a certain threshold, version merging takes place. The merged rowset is directly promoted to the base compaction stage. This approach effectively reduces the write amplification rate of compaction, especially in scenarios with continuous imports in a time series context. + + In the case of time series compaction, the execution of compaction is adjusted using parameters that have the prefix time_series_compaction. + + `"compaction_policy" = ""` + +* `time_series_compaction_goal_size_mbytes` + + Time series compaction policy will utilize this parameter to adjust the size of input files for each compaction. The output file size will be approximately equal to the input file size. + + `"time_series_compaction_goal_size_mbytes" = "1024"` + +* `time_series_compaction_file_count_threshold` + + Time series compaction policy will utilize this parameter to adjust the minimum number of input files for each compaction. + + If the number of files in a tablet exceeds the configured threshold, it will trigger a compaction process. + + `"time_series_compaction_file_count_threshold" = "2000"` + +* `time_series_compaction_time_threshold_seconds` + + When time series compaction policy is applied, a significant duration passes without a compaction being executed, a compaction will be triggered. + + `"time_series_compaction_time_threshold_seconds" = "3600"` + + * Dynamic partition related The relevant parameters of dynamic partition are as follows: diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md b/docs/zh-CN/docs/admin-manual/config/be-config.md index 75bdda8533..2a704cb8e9 100644 --- a/docs/zh-CN/docs/admin-manual/config/be-config.md +++ b/docs/zh-CN/docs/admin-manual/config/be-config.md @@ -673,33 +673,6 @@ BaseCompaction:546859: * 描述:更新 peer replica infos 的最小间隔时间 * 默认值:60(s) -#### `compaction_policy` - -* 类型:string -* 描述:配置 compaction 的合并策略,目前实现了两种合并策略,size_based 和 time_series - - size_based: 仅当 rowset 的磁盘体积在相同数量级时才进行版本合并。合并之后满足条件的 rowset 进行晋升到 base compaction阶段。能够做到在大量小批量导入的情况下:降低base compact的写入放大率,并在读取放大率和空间放大率之间进行权衡,同时减少了文件版本的数据。 - - time_series: 当 rowset 的磁盘体积积攒到一定大小时进行版本合并。合并后的 rowset 直接晋升到 base compaction 阶段。在时序场景持续导入的情况下有效降低 compact 的写入放大率。 -* 默认值:size_based - -#### `time_series_compaction_goal_size_mbytes` - -* 类型:int64 -* 描述:开启 time series compaction 时,将使用此参数来调整每次 compaction 输入的文件的大小,输出的文件大小和输入相当 -* 默认值:512 - -#### `time_series_compaction_file_count_threshold` - -* 类型:int64 -* 描述:开启 time series compaction 时,将使用此参数来调整每次 compaction 输入的文件数量的最小值,只有当 time_series_compaction_goal_size_mbytes 条件不满足时,该参数才会发挥作用 - - 一个 tablet 中文件数超过该配置,会触发 compaction -* 默认值:2000 - -#### `time_series_compaction_time_threshold_seconds` - -* 类型:int64 -* 描述:开启 time series compaction 时,将使用此参数来调整 compaction 的最长时间间隔,即长时间未执行过 compaction 时,就会触发一次 compaction,单位为秒 -* 默认值:3600 - ### 导入 diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md index 2fc5d23009..0d14ba3f4e 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md @@ -393,6 +393,36 @@ UNIQUE KEY(k1, k2) `"skip_write_index_on_load" = "false"` +* `compaction_policy` + + 配置这个表的 compaction 的合并策略,仅支持配置为 time_series 或者 size_based + + time_series: 当 rowset 的磁盘体积积攒到一定大小时进行版本合并。合并后的 rowset 直接晋升到 base compaction 阶段。在时序场景持续导入的情况下有效降低 compact 的写入放大率 + + 此策略将使用 time_series_compaction 为前缀的参数调整 compaction 的执行 + + `"compaction_policy" = ""` + +* `time_series_compaction_goal_size_mbytes` + + compaction 的合并策略为 time_series 时,将使用此参数来调整每次 compaction 输入的文件的大小,输出的文件大小和输入相当 + + `"time_series_compaction_goal_size_mbytes" = "1024"` + +* `time_series_compaction_file_count_threshold` + + compaction 的合并策略为 time_series 时,将使用此参数来调整每次 compaction 输入的文件数量的最小值 + + 一个 tablet 中,文件数超过该配置,就会触发 compaction + + `"time_series_compaction_file_count_threshold" = "2000"` + +* `time_series_compaction_time_threshold_seconds` + + compaction 的合并策略为 time_series 时,将使用此参数来调整 compaction 的最长时间间隔,即长时间未执行过 compaction 时,就会触发一次 compaction,单位为秒 + + `"time_series_compaction_time_threshold_seconds" = "3600"` + * 动态分区相关 动态分区相关参数如下: diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index 53eb9052e8..f8c90b8a52 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -211,6 +211,28 @@ public class Alter { } else if (currentAlterOps.checkIsBeingSynced(alterClauses)) { olapTable.setIsBeingSynced(currentAlterOps.isBeingSynced(alterClauses)); needProcessOutsideTableLock = true; + } else if (currentAlterOps.checkCompactionPolicy(alterClauses) + && currentAlterOps.getCompactionPolicy(alterClauses) != olapTable.getCompactionPolicy()) { + olapTable.setCompactionPolicy(currentAlterOps.getCompactionPolicy(alterClauses)); + needProcessOutsideTableLock = true; + } else if (currentAlterOps.checkTimeSeriesCompactionGoalSizeMbytes(alterClauses) + && currentAlterOps.getTimeSeriesCompactionGoalSizeMbytes(alterClauses) + != olapTable.getTimeSeriesCompactionGoalSizeMbytes()) { + olapTable.setTimeSeriesCompactionGoalSizeMbytes(currentAlterOps + .getTimeSeriesCompactionGoalSizeMbytes(alterClauses)); + needProcessOutsideTableLock = true; + } else if (currentAlterOps.checkTimeSeriesCompactionFileCountThreshold(alterClauses) + && currentAlterOps.getTimeSeriesCompactionFileCountThreshold(alterClauses) + != olapTable.getTimeSeriesCompactionFileCountThreshold()) { + olapTable.setTimeSeriesCompactionFileCountThreshold(currentAlterOps + .getTimeSeriesCompactionFileCountThreshold(alterClauses)); + needProcessOutsideTableLock = true; + } else if (currentAlterOps.checkTimeSeriesCompactionTimeThresholdSeconds(alterClauses) + && currentAlterOps.getTimeSeriesCompactionTimeThresholdSeconds(alterClauses) + != olapTable.getTimeSeriesCompactionTimeThresholdSeconds()) { + olapTable.setTimeSeriesCompactionTimeThresholdSeconds(currentAlterOps + .getTimeSeriesCompactionTimeThresholdSeconds(alterClauses)); + needProcessOutsideTableLock = true; } else if (currentAlterOps.checkBinlogConfigChange(alterClauses)) { if (!Config.enable_feature_binlog) { throw new DdlException("Binlog feature is not enabled"); @@ -514,7 +536,13 @@ public class Alter { // currently, only in memory and storage policy property could reach here Preconditions.checkState(properties.containsKey(PropertyAnalyzer.PROPERTIES_INMEMORY) || properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY) - || properties.containsKey(PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED)); + || properties.containsKey(PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED) + || properties.containsKey(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY) + || properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES) + || properties + .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD) + || properties + .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)); ((SchemaChangeHandler) schemaChangeHandler).updateTableProperties(db, tableName, properties); } else { throw new DdlException("Invalid alter operation: " + alterClause.getOpType()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterOperations.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterOperations.java index 22104522ed..62721fa37e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterOperations.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterOperations.java @@ -93,6 +93,60 @@ public class AlterOperations { || clause.getProperties().containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_HISTORY_NUMS)); } + public boolean checkCompactionPolicy(List<AlterClause> alterClauses) { + return alterClauses.stream().filter(clause -> + clause instanceof ModifyTablePropertiesClause + ).anyMatch(clause -> clause.getProperties().containsKey(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY)); + } + + public String getCompactionPolicy(List<AlterClause> alterClauses) { + return alterClauses.stream().filter(clause -> + clause instanceof ModifyTablePropertiesClause + ).map(c -> ((ModifyTablePropertiesClause) c).compactionPolicy()).findFirst().orElse(""); + } + + public boolean checkTimeSeriesCompactionGoalSizeMbytes(List<AlterClause> alterClauses) { + return alterClauses.stream().filter(clause -> + clause instanceof ModifyTablePropertiesClause + ).anyMatch(clause -> clause.getProperties() + .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)); + } + + public long getTimeSeriesCompactionGoalSizeMbytes(List<AlterClause> alterClauses) { + return alterClauses.stream().filter(clause -> + clause instanceof ModifyTablePropertiesClause + ).map(c -> ((ModifyTablePropertiesClause) c) + .timeSeriesCompactionGoalSizeMbytes()).findFirst().orElse((long) -1); + } + + public boolean checkTimeSeriesCompactionFileCountThreshold(List<AlterClause> alterClauses) { + return alterClauses.stream().filter(clause -> + clause instanceof ModifyTablePropertiesClause + ).anyMatch(clause -> clause.getProperties() + .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD)); + } + + public long getTimeSeriesCompactionFileCountThreshold(List<AlterClause> alterClauses) { + return alterClauses.stream().filter(clause -> + clause instanceof ModifyTablePropertiesClause + ).map(c -> ((ModifyTablePropertiesClause) c) + .timeSeriesCompactionFileCountThreshold()).findFirst().orElse((long) -1); + } + + public boolean checkTimeSeriesCompactionTimeThresholdSeconds(List<AlterClause> alterClauses) { + return alterClauses.stream().filter(clause -> + clause instanceof ModifyTablePropertiesClause + ).anyMatch(clause -> clause.getProperties() + .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)); + } + + public long getTimeSeriesCompactionTimeThresholdSeconds(List<AlterClause> alterClauses) { + return alterClauses.stream().filter(clause -> + clause instanceof ModifyTablePropertiesClause + ).map(c -> ((ModifyTablePropertiesClause) c) + .timeSeriesCompactionTimeThresholdSeconds()).findFirst().orElse((long) -1); + } + public boolean isBeingSynced(List<AlterClause> alterClauses) { return alterClauses.stream().filter(clause -> clause instanceof ModifyTablePropertiesClause diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index e05130b103..5ab9f77a88 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -284,6 +284,10 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { tbl.disableAutoCompaction(), tbl.enableSingleReplicaCompaction(), tbl.skipWriteIndexOnLoad(), + tbl.getCompactionPolicy(), + tbl.getTimeSeriesCompactionGoalSizeMbytes(), + tbl.getTimeSeriesCompactionFileCountThreshold(), + tbl.getTimeSeriesCompactionTimeThresholdSeconds(), tbl.storeRowColumn(), tbl.isDynamicSchema(), binlogConfig); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index ff2fcc3fd6..df7fdc81c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -2137,13 +2137,43 @@ public class SchemaChangeHandler extends AlterHandler { } long storagePolicyId = storagePolicyNameToId(storagePolicy); - if (isInMemory < 0 && storagePolicyId < 0) { - LOG.info("Properties already up-to-date"); - return; + String compactionPolicy = properties.get(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY); + if (compactionPolicy != null + && !compactionPolicy.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY) + && !compactionPolicy.equals(PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY)) { + throw new UserException("Table compaction policy only support for " + + PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY + + " or " + PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY); + } + + Map<String, Long> timeSeriesCompactionConfig = new HashMap<>(); + if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)) { + timeSeriesCompactionConfig + .put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES, + Long.parseLong(properties + .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES))); + } + if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD)) { + timeSeriesCompactionConfig + .put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD, + Long.parseLong(properties + .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD))); + } + if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)) { + timeSeriesCompactionConfig + .put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS, + Long.parseLong(properties + .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS))); } for (Partition partition : partitions) { - updatePartitionProperties(db, olapTable.getName(), partition.getName(), storagePolicyId, isInMemory, null); + updatePartitionProperties(db, olapTable.getName(), partition.getName(), storagePolicyId, isInMemory, + null, compactionPolicy, timeSeriesCompactionConfig); + } + + if (isInMemory < 0 && storagePolicyId < 0 && compactionPolicy == null && timeSeriesCompactionConfig.isEmpty()) { + LOG.info("Properties already up-to-date"); + return; } olapTable.writeLockOrDdlException(); @@ -2274,6 +2304,87 @@ public class SchemaChangeHandler extends AlterHandler { } } + /** + * Update one specified partition's properties by partition name of table + * This operation may return partial successfully, with an exception to inform user to retry + */ + public void updatePartitionProperties(Database db, String tableName, String partitionName, long storagePolicyId, + int isInMemory, BinlogConfig binlogConfig, String compactionPolicy, + Map<String, Long> timeSeriesCompactionConfig) throws UserException { + // be id -> <tablet id,schemaHash> + Map<Long, Set<Pair<Long, Integer>>> beIdToTabletIdWithHash = Maps.newHashMap(); + OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableName, Table.TableType.OLAP); + olapTable.readLock(); + try { + Partition partition = olapTable.getPartition(partitionName); + if (partition == null) { + throw new DdlException( + "Partition[" + partitionName + "] does not exist in table[" + olapTable.getName() + "]"); + } + + for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) { + int schemaHash = olapTable.getSchemaHashByIndexId(index.getId()); + for (Tablet tablet : index.getTablets()) { + for (Replica replica : tablet.getReplicas()) { + Set<Pair<Long, Integer>> tabletIdWithHash = beIdToTabletIdWithHash.computeIfAbsent( + replica.getBackendId(), k -> Sets.newHashSet()); + tabletIdWithHash.add(Pair.of(tablet.getId(), schemaHash)); + } + } + } + } finally { + olapTable.readUnlock(); + } + + int totalTaskNum = beIdToTabletIdWithHash.keySet().size(); + MarkedCountDownLatch<Long, Set<Pair<Long, Integer>>> countDownLatch = new MarkedCountDownLatch<>(totalTaskNum); + AgentBatchTask batchTask = new AgentBatchTask(); + for (Map.Entry<Long, Set<Pair<Long, Integer>>> kv : beIdToTabletIdWithHash.entrySet()) { + countDownLatch.addMark(kv.getKey(), kv.getValue()); + UpdateTabletMetaInfoTask task = new UpdateTabletMetaInfoTask(kv.getKey(), kv.getValue(), isInMemory, + storagePolicyId, binlogConfig, countDownLatch, compactionPolicy, timeSeriesCompactionConfig); + batchTask.addTask(task); + } + if (!FeConstants.runningUnitTest) { + // send all tasks and wait them finished + AgentTaskQueue.addBatchTask(batchTask); + AgentTaskExecutor.submit(batchTask); + LOG.info("send update tablet meta task for table {}, partitions {}, number: {}", tableName, partitionName, + batchTask.getTaskNum()); + + // estimate timeout + long timeout = Config.tablet_create_timeout_second * 1000L * totalTaskNum; + timeout = Math.min(timeout, Config.max_create_table_timeout_second * 1000); + boolean ok = false; + try { + ok = countDownLatch.await(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.warn("InterruptedException: ", e); + } + + if (!ok || !countDownLatch.getStatus().ok()) { + String errMsg = "Failed to update partition[" + partitionName + "]. tablet meta."; + // clear tasks + AgentTaskQueue.removeBatchTask(batchTask, TTaskType.UPDATE_TABLET_META_INFO); + + if (!countDownLatch.getStatus().ok()) { + errMsg += " Error: " + countDownLatch.getStatus().getErrorMsg(); + } else { + List<Map.Entry<Long, Set<Pair<Long, Integer>>>> unfinishedMarks = countDownLatch.getLeftMarks(); + // only show at most 3 results + List<Map.Entry<Long, Set<Pair<Long, Integer>>>> subList = unfinishedMarks.subList(0, + Math.min(unfinishedMarks.size(), 3)); + if (!subList.isEmpty()) { + errMsg += " Unfinished mark: " + Joiner.on(", ").join(subList); + } + } + errMsg += ". This operation maybe partial successfully, You should retry until success."; + LOG.warn(errMsg); + throw new DdlException(errMsg); + } + } + } + @Override public void cancel(CancelStmt stmt) throws DdlException { CancelAlterTableStmt cancelAlterTableStmt = (CancelAlterTableStmt) stmt; diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index cc10efa951..2be16f6119 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -279,6 +279,10 @@ public class SchemaChangeJobV2 extends AlterJobV2 { tbl.disableAutoCompaction(), tbl.enableSingleReplicaCompaction(), tbl.skipWriteIndexOnLoad(), + tbl.getCompactionPolicy(), + tbl.getTimeSeriesCompactionGoalSizeMbytes(), + tbl.getTimeSeriesCompactionFileCountThreshold(), + tbl.getTimeSeriesCompactionTimeThresholdSeconds(), tbl.storeRowColumn(), tbl.isDynamicSchema(), binlogConfig); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java index 93ece524f2..1ef31b3734 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java @@ -54,6 +54,46 @@ public class ModifyTablePropertiesClause extends AlterTableClause { return isBeingSynced; } + private String compactionPolicy; + + private long timeSeriesCompactionGoalSizeMbytes; + + private long timeSeriesCompactionFileCountThreshold; + + private long timeSeriesCompactionTimeThresholdSeconds; + + public void setCompactionPolicy(String compactionPolicy) { + this.compactionPolicy = compactionPolicy; + } + + public String compactionPolicy() { + return compactionPolicy; + } + + public void setTimeSeriesCompactionGoalSizeMbytes(long timeSeriesCompactionGoalSizeMbytes) { + this.timeSeriesCompactionGoalSizeMbytes = timeSeriesCompactionGoalSizeMbytes; + } + + public long timeSeriesCompactionGoalSizeMbytes() { + return timeSeriesCompactionGoalSizeMbytes; + } + + public void setTimeSeriesCompactionFileCountThreshold(long timeSeriesCompactionFileCountThreshold) { + this.timeSeriesCompactionFileCountThreshold = timeSeriesCompactionFileCountThreshold; + } + + public Long timeSeriesCompactionFileCountThreshold() { + return timeSeriesCompactionFileCountThreshold; + } + + public void setTimeSeriesCompactionTimeThresholdSeconds(long timeSeriesCompactionTimeThresholdSeconds) { + this.timeSeriesCompactionTimeThresholdSeconds = timeSeriesCompactionTimeThresholdSeconds; + } + + public Long timeSeriesCompactionTimeThresholdSeconds() { + return timeSeriesCompactionTimeThresholdSeconds; + } + public ModifyTablePropertiesClause(Map<String, String> properties) { super(AlterOpType.MODIFY_TABLE_PROPERTY); this.properties = properties; @@ -142,6 +182,65 @@ public class ModifyTablePropertiesClause extends AlterTableClause { || properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_BYTES) || properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_HISTORY_NUMS)) { // do nothing, will be alter in SchemaChangeHandler.updateBinlogConfig + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY)) { + String compactionPolicy = properties.getOrDefault(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY, ""); + if (compactionPolicy != null + && !compactionPolicy.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY) + && !compactionPolicy.equals(PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY)) { + throw new AnalysisException( + "Table compaction policy only support for " + PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY + + " or " + PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY); + } + this.needTableStable = false; + setCompactionPolicy(compactionPolicy); + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)) { + long goalSizeMbytes; + String goalSizeMbytesStr = properties + .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES); + try { + goalSizeMbytes = Long.parseLong(goalSizeMbytesStr); + if (goalSizeMbytes < 10) { + throw new AnalysisException("time_series_compaction_goal_size_mbytes can not be less than 10:" + + goalSizeMbytesStr); + } + } catch (NumberFormatException e) { + throw new AnalysisException("Invalid time_series_compaction_goal_size_mbytes format: " + + goalSizeMbytesStr); + } + this.needTableStable = false; + setTimeSeriesCompactionGoalSizeMbytes(goalSizeMbytes); + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD)) { + long fileCountThreshold; + String fileCountThresholdStr = properties + .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD); + try { + fileCountThreshold = Long.parseLong(fileCountThresholdStr); + if (fileCountThreshold < 10) { + throw new AnalysisException("time_series_compaction_file_count_threshold can not be less than 10:" + + fileCountThresholdStr); + } + } catch (NumberFormatException e) { + throw new AnalysisException("Invalid time_series_compaction_file_count_threshold format: " + + fileCountThresholdStr); + } + this.needTableStable = false; + setTimeSeriesCompactionFileCountThreshold(fileCountThreshold); + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)) { + long timeThresholdSeconds; + String timeThresholdSecondsStr = properties + .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS); + try { + timeThresholdSeconds = Long.parseLong(timeThresholdSecondsStr); + if (timeThresholdSeconds < 60) { + throw new AnalysisException("time_series_compaction_time_threshold_seconds can not be less than 60:" + + timeThresholdSecondsStr); + } + } catch (NumberFormatException e) { + throw new AnalysisException("Invalid time_series_compaction_time_threshold_seconds format: " + + timeThresholdSecondsStr); + } + this.needTableStable = false; + setTimeSeriesCompactionTimeThresholdSeconds(timeThresholdSeconds); } else { throw new AnalysisException("Unknown table property: " + properties.keySet()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index a5256dac64..e1c0e4a29e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -1043,6 +1043,10 @@ public class RestoreJob extends AbstractJob { localTbl.disableAutoCompaction(), localTbl.enableSingleReplicaCompaction(), localTbl.skipWriteIndexOnLoad(), + localTbl.getCompactionPolicy(), + localTbl.getTimeSeriesCompactionGoalSizeMbytes(), + localTbl.getTimeSeriesCompactionFileCountThreshold(), + localTbl.getTimeSeriesCompactionTimeThresholdSeconds(), localTbl.storeRowColumn(), localTbl.isDynamicSchema(), binlogConfig); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index f1bb519290..90ee5a9f29 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -3135,6 +3135,37 @@ public class Env { sb.append(olapTable.skipWriteIndexOnLoad()).append("\""); } + // compaction policy + if (olapTable.getCompactionPolicy() != null && !olapTable.getCompactionPolicy().equals("") + && !olapTable.getCompactionPolicy().equals(PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY)) { + sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY).append("\" = \""); + sb.append(olapTable.getCompactionPolicy()).append("\""); + } + + // time series compaction goal size + if (olapTable.getCompactionPolicy() != null && olapTable.getCompactionPolicy() + .equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)) { + sb.append(",\n\"").append(PropertyAnalyzer + .PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES).append("\" = \""); + sb.append(olapTable.getTimeSeriesCompactionGoalSizeMbytes()).append("\""); + } + + // time series compaction file count threshold + if (olapTable.getCompactionPolicy() != null && olapTable.getCompactionPolicy() + .equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)) { + sb.append(",\n\"").append(PropertyAnalyzer + .PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD).append("\" = \""); + sb.append(olapTable.getTimeSeriesCompactionFileCountThreshold()).append("\""); + } + + // time series compaction time threshold + if (olapTable.getCompactionPolicy() != null && olapTable.getCompactionPolicy() + .equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)) { + sb.append(",\n\"").append(PropertyAnalyzer + .PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS).append("\" = \""); + sb.append(olapTable.getTimeSeriesCompactionTimeThresholdSeconds()).append("\""); + } + // dynamic schema if (olapTable.isDynamicSchema()) { sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_DYNAMIC_SCHEMA).append("\" = \""); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index ff5e3a8570..a242fa9eb7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -1844,6 +1844,62 @@ public class OlapTable extends Table { return false; } + public void setCompactionPolicy(String compactionPolicy) { + TableProperty tableProperty = getOrCreatTableProperty(); + tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY, compactionPolicy); + tableProperty.buildCompactionPolicy(); + } + + public String getCompactionPolicy() { + if (tableProperty != null) { + return tableProperty.compactionPolicy(); + } + return ""; + } + + public void setTimeSeriesCompactionGoalSizeMbytes(long timeSeriesCompactionGoalSizeMbytes) { + TableProperty tableProperty = getOrCreatTableProperty(); + tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES, + Long.valueOf(timeSeriesCompactionGoalSizeMbytes).toString()); + tableProperty.buildTimeSeriesCompactionGoalSizeMbytes(); + } + + public Long getTimeSeriesCompactionGoalSizeMbytes() { + if (tableProperty != null) { + return tableProperty.timeSeriesCompactionGoalSizeMbytes(); + } + return null; + } + + public void setTimeSeriesCompactionFileCountThreshold(long timeSeriesCompactionFileCountThreshold) { + TableProperty tableProperty = getOrCreatTableProperty(); + tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD, + Long.valueOf(timeSeriesCompactionFileCountThreshold).toString()); + tableProperty.buildTimeSeriesCompactionFileCountThreshold(); + } + + public Long getTimeSeriesCompactionFileCountThreshold() { + if (tableProperty != null) { + return tableProperty.timeSeriesCompactionFileCountThreshold(); + } + return null; + } + + public void setTimeSeriesCompactionTimeThresholdSeconds(long timeSeriesCompactionTimeThresholdSeconds) { + TableProperty tableProperty = getOrCreatTableProperty(); + tableProperty.modifyTableProperties(PropertyAnalyzer + .PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS, + Long.valueOf(timeSeriesCompactionTimeThresholdSeconds).toString()); + tableProperty.buildTimeSeriesCompactionTimeThresholdSeconds(); + } + + public Long getTimeSeriesCompactionTimeThresholdSeconds() { + if (tableProperty != null) { + return tableProperty.timeSeriesCompactionTimeThresholdSeconds(); + } + return null; + } + public Boolean isDynamicSchema() { if (tableProperty != null) { return tableProperty.isDynamicSchema(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java index bced77425b..c8b6cdb53e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java @@ -86,6 +86,17 @@ public class TableProperty implements Writable { private boolean skipWriteIndexOnLoad = false; + private String compactionPolicy = PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY; + + private long timeSeriesCompactionGoalSizeMbytes + = PropertyAnalyzer.TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE; + + private long timeSeriesCompactionFileCountThreshold + = PropertyAnalyzer.TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE; + + private long timeSeriesCompactionTimeThresholdSeconds + = PropertyAnalyzer.TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE; + private DataSortInfo dataSortInfo = new DataSortInfo(); public TableProperty(Map<String, String> properties) { @@ -214,6 +225,51 @@ public class TableProperty implements Writable { return skipWriteIndexOnLoad; } + public TableProperty buildCompactionPolicy() { + compactionPolicy = properties.getOrDefault(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY, + PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY); + return this; + } + + public String compactionPolicy() { + return compactionPolicy; + } + + public TableProperty buildTimeSeriesCompactionGoalSizeMbytes() { + timeSeriesCompactionGoalSizeMbytes = Long.parseLong(properties + .getOrDefault(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES, + String.valueOf(PropertyAnalyzer.TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE))); + return this; + } + + public long timeSeriesCompactionGoalSizeMbytes() { + return timeSeriesCompactionGoalSizeMbytes; + } + + public TableProperty buildTimeSeriesCompactionFileCountThreshold() { + timeSeriesCompactionFileCountThreshold = Long.parseLong(properties + .getOrDefault(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD, + String.valueOf(PropertyAnalyzer.TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE))); + + return this; + } + + public long timeSeriesCompactionFileCountThreshold() { + return timeSeriesCompactionFileCountThreshold; + } + + public TableProperty buildTimeSeriesCompactionTimeThresholdSeconds() { + timeSeriesCompactionTimeThresholdSeconds = Long.parseLong(properties + .getOrDefault(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS, + String.valueOf(PropertyAnalyzer.TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE))); + + return this; + } + + public long timeSeriesCompactionTimeThresholdSeconds() { + return timeSeriesCompactionTimeThresholdSeconds; + } + public TableProperty buildStoragePolicy() { storagePolicy = properties.getOrDefault(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY, ""); return this; @@ -445,6 +501,10 @@ public class TableProperty implements Writable { .buildEnableLightSchemaChange() .buildStoreRowColumn() .buildSkipWriteIndexOnLoad() + .buildCompactionPolicy() + .buildTimeSeriesCompactionGoalSizeMbytes() + .buildTimeSeriesCompactionFileCountThreshold() + .buildTimeSeriesCompactionTimeThresholdSeconds() .buildDisableAutoCompaction() .buildEnableSingleReplicaCompaction(); if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_105) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index b80d511256..c3570ff590 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -127,6 +127,16 @@ public class PropertyAnalyzer { public static final String PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD = "skip_write_index_on_load"; + public static final String PROPERTIES_COMPACTION_POLICY = "compaction_policy"; + + public static final String PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES = + "time_series_compaction_goal_size_mbytes"; + + public static final String PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD = + "time_series_compaction_file_count_threshold"; + + public static final String PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS = + "time_series_compaction_time_threshold_seconds"; public static final String PROPERTIES_MUTABLE = "mutable"; public static final String PROPERTIES_IS_BEING_SYNCED = "is_being_synced"; @@ -152,6 +162,16 @@ public class PropertyAnalyzer { private static final double MAX_FPP = 0.05; private static final double MIN_FPP = 0.0001; + // compaction policy + public static final String SIZE_BASED_COMPACTION_POLICY = "size_based"; + public static final String TIME_SERIES_COMPACTION_POLICY = "time_series"; + public static final long TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE = 1024; + public static final long TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE = 2000; + public static final long TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE = 3600; + + + + /** * check and replace members of DataProperty by properties. * @@ -595,6 +615,94 @@ public class PropertyAnalyzer { + " must be `true` or `false`"); } + public static String analyzeCompactionPolicy(Map<String, String> properties) throws AnalysisException { + if (properties == null || properties.isEmpty()) { + return SIZE_BASED_COMPACTION_POLICY; + } + String compactionPolicy = SIZE_BASED_COMPACTION_POLICY; + if (properties.containsKey(PROPERTIES_COMPACTION_POLICY)) { + compactionPolicy = properties.get(PROPERTIES_COMPACTION_POLICY); + properties.remove(PROPERTIES_COMPACTION_POLICY); + if (compactionPolicy != null && !compactionPolicy.equals(TIME_SERIES_COMPACTION_POLICY) + && !compactionPolicy.equals(SIZE_BASED_COMPACTION_POLICY)) { + throw new AnalysisException(PROPERTIES_COMPACTION_POLICY + + " must be " + TIME_SERIES_COMPACTION_POLICY + " or " + SIZE_BASED_COMPACTION_POLICY); + } + } + + return compactionPolicy; + } + + public static long analyzeTimeSeriesCompactionGoalSizeMbytes(Map<String, String> properties) + throws AnalysisException { + long goalSizeMbytes = TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE; + if (properties == null || properties.isEmpty()) { + return goalSizeMbytes; + } + if (properties.containsKey(PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)) { + String goalSizeMbytesStr = properties.get(PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES); + properties.remove(PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES); + try { + goalSizeMbytes = Long.parseLong(goalSizeMbytesStr); + if (goalSizeMbytes < 10) { + throw new AnalysisException("time_series_compaction_goal_size_mbytes can not be" + + " less than 10: " + goalSizeMbytesStr); + } + } catch (NumberFormatException e) { + throw new AnalysisException("Invalid time_series_compaction_goal_size_mbytes format: " + + goalSizeMbytesStr); + } + } + return goalSizeMbytes; + } + + public static long analyzeTimeSeriesCompactionFileCountThreshold(Map<String, String> properties) + throws AnalysisException { + long fileCountThreshold = TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE; + if (properties == null || properties.isEmpty()) { + return fileCountThreshold; + } + if (properties.containsKey(PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD)) { + String fileCountThresholdStr = properties + .get(PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD); + properties.remove(PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD); + try { + fileCountThreshold = Long.parseLong(fileCountThresholdStr); + if (fileCountThreshold < 10) { + throw new AnalysisException("time_series_compaction_file_count_threshold can not be " + + "less than 10: " + fileCountThresholdStr); + } + } catch (NumberFormatException e) { + throw new AnalysisException("Invalid time_series_compaction_file_count_threshold format: " + + fileCountThresholdStr); + } + } + return fileCountThreshold; + } + + public static long analyzeTimeSeriesCompactionTimeThresholdSeconds(Map<String, String> properties) + throws AnalysisException { + long timeThresholdSeconds = TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE; + if (properties == null || properties.isEmpty()) { + return timeThresholdSeconds; + } + if (properties.containsKey(PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)) { + String timeThresholdSecondsStr = properties.get(PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS); + properties.remove(PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS); + try { + timeThresholdSeconds = Long.parseLong(timeThresholdSecondsStr); + if (timeThresholdSeconds < 60) { + throw new AnalysisException("time_series_compaction_time_threshold_seconds can not be" + + " less than 60: " + timeThresholdSecondsStr); + } + } catch (NumberFormatException e) { + throw new AnalysisException("Invalid time_series_compaction_time_threshold_seconds format: " + + timeThresholdSecondsStr); + } + } + return timeThresholdSeconds; + } + // analyzeCompressionType will parse the compression type from properties public static TCompressionType analyzeCompressionType(Map<String, String> properties) throws AnalysisException { String compressionType = ""; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 6c1079688d..46c89df7ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1434,6 +1434,21 @@ public class InternalCatalog implements CatalogIf<Database> { properties.put(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD, olapTable.skipWriteIndexOnLoad().toString()); } + if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY)) { + properties.put(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY, olapTable.getCompactionPolicy()); + } + if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)) { + properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES, + olapTable.getTimeSeriesCompactionGoalSizeMbytes().toString()); + } + if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD)) { + properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD, + olapTable.getTimeSeriesCompactionFileCountThreshold().toString()); + } + if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)) { + properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS, + olapTable.getTimeSeriesCompactionTimeThresholdSeconds().toString()); + } if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_DYNAMIC_SCHEMA)) { properties.put(PropertyAnalyzer.PROPERTIES_DYNAMIC_SCHEMA, olapTable.isDynamicSchema().toString()); @@ -1530,7 +1545,11 @@ public class InternalCatalog implements CatalogIf<Database> { singlePartitionDesc.getTabletType(), olapTable.getCompressionType(), olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy, idGeneratorBuffer, olapTable.disableAutoCompaction(), olapTable.enableSingleReplicaCompaction(), - olapTable.skipWriteIndexOnLoad(), olapTable.storeRowColumn(), olapTable.isDynamicSchema(), + olapTable.skipWriteIndexOnLoad(), olapTable.getCompactionPolicy(), + olapTable.getTimeSeriesCompactionGoalSizeMbytes(), + olapTable.getTimeSeriesCompactionFileCountThreshold(), + olapTable.getTimeSeriesCompactionTimeThresholdSeconds(), + olapTable.storeRowColumn(), olapTable.isDynamicSchema(), binlogConfig, dataProperty.isStorageMediumSpecified()); // check again @@ -1758,6 +1777,8 @@ public class InternalCatalog implements CatalogIf<Database> { DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite, String storagePolicy, IdGeneratorBuffer idGeneratorBuffer, boolean disableAutoCompaction, boolean enableSingleReplicaCompaction, boolean skipWriteIndexOnLoad, + String compactionPolicy, Long timeSeriesCompactionGoalSizeMbytes, + Long timeSeriesCompactionFileCountThreshold, Long timeSeriesCompactionTimeThresholdSeconds, boolean storeRowColumn, boolean isDynamicSchema, BinlogConfig binlogConfig, boolean isStorageMediumSpecified) throws DdlException { // create base index first. @@ -1822,6 +1843,8 @@ public class InternalCatalog implements CatalogIf<Database> { storageMedium, schema, bfColumns, bfFpp, countDownLatch, indexes, isInMemory, tabletType, dataSortInfo, compressionType, enableUniqueKeyMergeOnWrite, storagePolicy, disableAutoCompaction, enableSingleReplicaCompaction, skipWriteIndexOnLoad, + compactionPolicy, timeSeriesCompactionGoalSizeMbytes, + timeSeriesCompactionFileCountThreshold, timeSeriesCompactionTimeThresholdSeconds, storeRowColumn, isDynamicSchema, binlogConfig); task.setStorageFormat(storageFormat); @@ -1982,6 +2005,56 @@ public class InternalCatalog implements CatalogIf<Database> { // use light schema change optimization olapTable.setDisableAutoCompaction(disableAutoCompaction); + // set compaction policy + String compactionPolicy = PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY; + try { + compactionPolicy = PropertyAnalyzer.analyzeCompactionPolicy(properties); + } catch (AnalysisException e) { + throw new DdlException(e.getMessage()); + } + olapTable.setCompactionPolicy(compactionPolicy); + + if (!compactionPolicy.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY) + && (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES) + || properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD) + || properties + .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS))) { + throw new DdlException("only time series compaction policy support for time series config"); + } + + // set time series compaction goal size + long timeSeriesCompactionGoalSizeMbytes + = PropertyAnalyzer.TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE; + try { + timeSeriesCompactionGoalSizeMbytes = PropertyAnalyzer + .analyzeTimeSeriesCompactionGoalSizeMbytes(properties); + } catch (AnalysisException e) { + throw new DdlException(e.getMessage()); + } + olapTable.setTimeSeriesCompactionGoalSizeMbytes(timeSeriesCompactionGoalSizeMbytes); + + // set time series compaction file count threshold + long timeSeriesCompactionFileCountThreshold + = PropertyAnalyzer.TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE; + try { + timeSeriesCompactionFileCountThreshold = PropertyAnalyzer + .analyzeTimeSeriesCompactionFileCountThreshold(properties); + } catch (AnalysisException e) { + throw new DdlException(e.getMessage()); + } + olapTable.setTimeSeriesCompactionFileCountThreshold(timeSeriesCompactionFileCountThreshold); + + // set time series compaction time threshold + long timeSeriesCompactionTimeThresholdSeconds + = PropertyAnalyzer.TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE; + try { + timeSeriesCompactionTimeThresholdSeconds = PropertyAnalyzer + .analyzeTimeSeriesCompactionTimeThresholdSeconds(properties); + } catch (AnalysisException e) { + throw new DdlException(e.getMessage()); + } + olapTable.setTimeSeriesCompactionTimeThresholdSeconds(timeSeriesCompactionTimeThresholdSeconds); + // get storage format TStorageFormat storageFormat = TStorageFormat.V2; // default is segment v2 try { @@ -2307,6 +2380,9 @@ public class InternalCatalog implements CatalogIf<Database> { olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy, idGeneratorBuffer, olapTable.disableAutoCompaction(), olapTable.enableSingleReplicaCompaction(), skipWriteIndexOnLoad, + olapTable.getCompactionPolicy(), olapTable.getTimeSeriesCompactionGoalSizeMbytes(), + olapTable.getTimeSeriesCompactionFileCountThreshold(), + olapTable.getTimeSeriesCompactionTimeThresholdSeconds(), storeRowColumn, isDynamicSchema, binlogConfigForTask, partitionInfo.getDataProperty(partitionId).isStorageMediumSpecified()); olapTable.addPartition(partition); @@ -2373,8 +2449,12 @@ public class InternalCatalog implements CatalogIf<Database> { storageFormat, partitionInfo.getTabletType(entry.getValue()), compressionType, olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy, idGeneratorBuffer, olapTable.disableAutoCompaction(), - olapTable.enableSingleReplicaCompaction(), skipWriteIndexOnLoad, storeRowColumn, - isDynamicSchema, binlogConfigForTask, dataProperty.isStorageMediumSpecified()); + olapTable.enableSingleReplicaCompaction(), skipWriteIndexOnLoad, + olapTable.getCompactionPolicy(), olapTable.getTimeSeriesCompactionGoalSizeMbytes(), + olapTable.getTimeSeriesCompactionFileCountThreshold(), + olapTable.getTimeSeriesCompactionTimeThresholdSeconds(), + storeRowColumn, isDynamicSchema, binlogConfigForTask, + dataProperty.isStorageMediumSpecified()); olapTable.addPartition(partition); } } else { @@ -2797,6 +2877,9 @@ public class InternalCatalog implements CatalogIf<Database> { olapTable.getPartitionInfo().getDataProperty(oldPartitionId).getStoragePolicy(), idGeneratorBuffer, olapTable.disableAutoCompaction(), olapTable.enableSingleReplicaCompaction(), olapTable.skipWriteIndexOnLoad(), + olapTable.getCompactionPolicy(), olapTable.getTimeSeriesCompactionGoalSizeMbytes(), + olapTable.getTimeSeriesCompactionFileCountThreshold(), + olapTable.getTimeSeriesCompactionTimeThresholdSeconds(), olapTable.storeRowColumn(), olapTable.isDynamicSchema(), binlogConfig, copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId).isStorageMediumSpecified()); newPartitions.add(newPartition); diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 17be05444e..055652a0b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -809,7 +809,10 @@ public class ReportHandler extends Daemon { olapTable.getEnableUniqueKeyMergeOnWrite(), olapTable.getStoragePolicy(), olapTable.disableAutoCompaction(), olapTable.enableSingleReplicaCompaction(), - olapTable.skipWriteIndexOnLoad(), + olapTable.skipWriteIndexOnLoad(), olapTable.getCompactionPolicy(), + olapTable.getTimeSeriesCompactionGoalSizeMbytes(), + olapTable.getTimeSeriesCompactionFileCountThreshold(), + olapTable.getTimeSeriesCompactionTimeThresholdSeconds(), olapTable.storeRowColumn(), olapTable.isDynamicSchema(), binlogConfig); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java index 49bd33fb5c..f654c16784 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java @@ -105,6 +105,14 @@ public class CreateReplicaTask extends AgentTask { private boolean skipWriteIndexOnLoad; + private String compactionPolicy; + + private long timeSeriesCompactionGoalSizeMbytes; + + private long timeSeriesCompactionFileCountThreshold; + + private long timeSeriesCompactionTimeThresholdSeconds; + private boolean storeRowColumn; private BinlogConfig binlogConfig; @@ -123,6 +131,10 @@ public class CreateReplicaTask extends AgentTask { String storagePolicy, boolean disableAutoCompaction, boolean enableSingleReplicaCompaction, boolean skipWriteIndexOnLoad, + String compactionPolicy, + long timeSeriesCompactionGoalSizeMbytes, + long timeSeriesCompactionFileCountThreshold, + long timeSeriesCompactionTimeThresholdSeconds, boolean storeRowColumn, boolean isDynamicSchema, BinlogConfig binlogConfig) { @@ -162,6 +174,10 @@ public class CreateReplicaTask extends AgentTask { this.disableAutoCompaction = disableAutoCompaction; this.enableSingleReplicaCompaction = enableSingleReplicaCompaction; this.skipWriteIndexOnLoad = skipWriteIndexOnLoad; + this.compactionPolicy = compactionPolicy; + this.timeSeriesCompactionGoalSizeMbytes = timeSeriesCompactionGoalSizeMbytes; + this.timeSeriesCompactionFileCountThreshold = timeSeriesCompactionFileCountThreshold; + this.timeSeriesCompactionTimeThresholdSeconds = timeSeriesCompactionTimeThresholdSeconds; this.storeRowColumn = storeRowColumn; this.binlogConfig = binlogConfig; } @@ -270,7 +286,6 @@ public class CreateReplicaTask extends AgentTask { tSchema.setDisableAutoCompaction(disableAutoCompaction); tSchema.setEnableSingleReplicaCompaction(enableSingleReplicaCompaction); tSchema.setSkipWriteIndexOnLoad(skipWriteIndexOnLoad); - tSchema.setSkipWriteIndexOnLoad(skipWriteIndexOnLoad); tSchema.setStoreRowColumn(storeRowColumn); tSchema.setIsDynamicSchema(isDynamicSchema); createTabletReq.setTabletSchema(tSchema); @@ -300,6 +315,10 @@ public class CreateReplicaTask extends AgentTask { createTabletReq.setTabletType(tabletType); createTabletReq.setCompressionType(compressionType); createTabletReq.setEnableUniqueKeyMergeOnWrite(enableUniqueKeyMergeOnWrite); + createTabletReq.setCompactionPolicy(compactionPolicy); + createTabletReq.setTimeSeriesCompactionGoalSizeMbytes(timeSeriesCompactionGoalSizeMbytes); + createTabletReq.setTimeSeriesCompactionFileCountThreshold(timeSeriesCompactionFileCountThreshold); + createTabletReq.setTimeSeriesCompactionTimeThresholdSeconds(timeSeriesCompactionTimeThresholdSeconds); if (binlogConfig != null) { createTabletReq.setBinlogConfig(binlogConfig.toThrift()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java index 29ad3ce199..6b12c34e0d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.BinlogConfig; import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.Pair; import org.apache.doris.common.Status; +import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TTabletMetaInfo; import org.apache.doris.thrift.TTaskType; @@ -30,6 +31,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.Set; @@ -44,6 +46,8 @@ public class UpdateTabletMetaInfoTask extends AgentTask { private int inMemory = -1; // < 0 means not to update inMemory property, > 0 means true, == 0 means false private long storagePolicyId = -1; // < 0 means not to update storage policy, == 0 means to reset storage policy private BinlogConfig binlogConfig = null; // null means not to update binlog config + private String compactionPolicy = null; // null means not to update compaction policy + private Map<String, Long> timeSeriesCompactionConfig = null; // null means not to update compaction policy config // For ReportHandler private List<TTabletMetaInfo> tabletMetaInfos; @@ -72,6 +76,22 @@ public class UpdateTabletMetaInfoTask extends AgentTask { this.tabletMetaInfos = tabletMetaInfos; } + public UpdateTabletMetaInfoTask(long backendId, + Set<Pair<Long, Integer>> tableIdWithSchemaHash, + int inMemory, long storagePolicyId, + BinlogConfig binlogConfig, + MarkedCountDownLatch<Long, Set<Pair<Long, Integer>>> latch, + String compactionPolicy, + Map<String, Long> timeSeriesCompactionConfig) { + this(backendId, tableIdWithSchemaHash); + this.storagePolicyId = storagePolicyId; + this.inMemory = inMemory; + this.binlogConfig = binlogConfig; + this.latch = latch; + this.compactionPolicy = compactionPolicy; + this.timeSeriesCompactionConfig = timeSeriesCompactionConfig; + } + public void countDownLatch(long backendId, Set<Pair<Long, Integer>> tablets) { if (this.latch != null) { if (latch.markedCountDown(backendId, tablets)) { @@ -110,6 +130,26 @@ public class UpdateTabletMetaInfoTask extends AgentTask { if (binlogConfig != null) { metaInfo.setBinlogConfig(binlogConfig.toThrift()); } + if (compactionPolicy != null) { + metaInfo.setCompactionPolicy(compactionPolicy); + } + if (timeSeriesCompactionConfig != null && !timeSeriesCompactionConfig.isEmpty()) { + if (timeSeriesCompactionConfig + .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)) { + metaInfo.setTimeSeriesCompactionGoalSizeMbytes(timeSeriesCompactionConfig + .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)); + } + if (timeSeriesCompactionConfig + .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD)) { + metaInfo.setTimeSeriesCompactionFileCountThreshold(timeSeriesCompactionConfig + .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD)); + } + if (timeSeriesCompactionConfig + .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)) { + metaInfo.setTimeSeriesCompactionTimeThresholdSeconds(timeSeriesCompactionConfig + .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)); + } + } updateTabletMetaInfoReq.addToTabletMetaInfos(metaInfo); } } else { diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java index cf19d7d918..86d3fda79f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java @@ -107,7 +107,7 @@ public class AgentTaskTest { createReplicaTask = new CreateReplicaTask(backendId1, dbId, tableId, partitionId, indexId1, tabletId1, replicaId1, shortKeyNum, schemaHash1, version, KeysType.AGG_KEYS, storageType, TStorageMedium.SSD, columns, null, 0, latch, null, false, TTabletType.TABLET_TYPE_DISK, null, - TCompressionType.LZ4F, false, "", false, false, false, false, false, null); + TCompressionType.LZ4F, false, "", false, false, false, "", 0, 0, 0, false, false, null); // drop dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1, schemaHash1, false); diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index fe894dba9d..7c0c83a4a0 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -311,6 +311,10 @@ message TabletMetaPB { optional int64 storage_policy_id = 25; optional PUniqueId cooldown_meta_id = 26; optional BinlogConfigPB binlog_config = 27; + optional string compaction_policy = 28 [default = "size_based"]; + optional int64 time_series_compaction_goal_size_mbytes = 29 [default = 1024]; + optional int64 time_series_compaction_file_count_threshold = 30 [default = 2000]; + optional int64 time_series_compaction_time_threshold_seconds = 31 [default = 3600]; } message OLAPRawDeltaHeaderMessage { diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index f92f873fc4..e5a94cbaba 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -141,6 +141,10 @@ struct TCreateTabletReq { 19: optional bool enable_unique_key_merge_on_write = false 20: optional i64 storage_policy_id 21: optional TBinlogConfig binlog_config + 22: optional string compaction_policy = "size_based" + 23: optional i64 time_series_compaction_goal_size_mbytes = 1024 + 24: optional i64 time_series_compaction_file_count_threshold = 2000 + 25: optional i64 time_series_compaction_time_threshold_seconds = 3600 } struct TDropTabletReq { @@ -401,6 +405,10 @@ struct TTabletMetaInfo { 7: optional i64 storage_policy_id 8: optional Types.TReplicaId replica_id 9: optional TBinlogConfig binlog_config + 10: optional string compaction_policy + 11: optional i64 time_series_compaction_goal_size_mbytes + 12: optional i64 time_series_compaction_file_count_threshold + 13: optional i64 time_series_compaction_time_threshold_seconds } struct TUpdateTabletMetaInfoReq { diff --git a/regression-test/suites/compaction/test_table_level_compaction_policy.groovy b/regression-test/suites/compaction/test_table_level_compaction_policy.groovy new file mode 100644 index 0000000000..838d8b34dc --- /dev/null +++ b/regression-test/suites/compaction/test_table_level_compaction_policy.groovy @@ -0,0 +1,207 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_table_level_compaction_policy") { + def tableName = "test_table_level_compaction_policy" + sql """ DROP TABLE IF EXISTS ${tableName} """ + + sql """ + CREATE TABLE ${tableName} ( + `c_custkey` int(11) NOT NULL COMMENT "", + `c_name` varchar(26) NOT NULL COMMENT "", + `c_address` varchar(41) NOT NULL COMMENT "", + `c_city` varchar(11) NOT NULL COMMENT "" + ) + DUPLICATE KEY (`c_custkey`) + DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "compaction_policy" = "time_series", + "time_series_compaction_goal_size_mbytes" = "2048", + "time_series_compaction_file_count_threshold" = "5000", + "time_series_compaction_time_threshold_seconds" = "86400" + ); + """ + result = sql """show create table ${tableName}""" + logger.info("${result}") + assertTrue(result.toString().containsIgnoreCase('"compaction_policy" = "time_series"')) + assertTrue(result.toString().containsIgnoreCase('"time_series_compaction_goal_size_mbytes" = "2048"')) + assertTrue(result.toString().containsIgnoreCase('"time_series_compaction_file_count_threshold" = "5000"')) + assertTrue(result.toString().containsIgnoreCase('"time_series_compaction_time_threshold_seconds" = "86400"')) + + sql """ + alter table ${tableName} set ("time_series_compaction_goal_size_mbytes" = "1024") + """ + + result = sql """show create table ${tableName}""" + logger.info("${result}") + assertTrue(result.toString().containsIgnoreCase('"time_series_compaction_goal_size_mbytes" = "1024"')) + + sql """ + alter table ${tableName} set ("time_series_compaction_file_count_threshold" = "6000") + """ + + result = sql """show create table ${tableName}""" + logger.info("${result}") + assertTrue(result.toString().containsIgnoreCase('"time_series_compaction_file_count_threshold" = "6000"')) + + sql """ + alter table ${tableName} set ("time_series_compaction_time_threshold_seconds" = "3000") + """ + + result = sql """show create table ${tableName}""" + logger.info("${result}") + assertTrue(result.toString().containsIgnoreCase('"time_series_compaction_time_threshold_seconds" = "3000"')) + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + sql """ + CREATE TABLE ${tableName} ( + `c_custkey` int(11) NOT NULL COMMENT "", + `c_name` varchar(26) NOT NULL COMMENT "", + `c_address` varchar(41) NOT NULL COMMENT "", + `c_city` varchar(11) NOT NULL COMMENT "" + ) + DUPLICATE KEY (`c_custkey`) + DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ); + """ + result = sql """show create table ${tableName}""" + logger.info("${result}") + assertFalse(result.toString().containsIgnoreCase('"compaction_policy"')) + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + test { + sql """ + CREATE TABLE ${tableName} ( + `c_custkey` int(11) NOT NULL COMMENT "", + `c_name` varchar(26) NOT NULL COMMENT "", + `c_address` varchar(41) NOT NULL COMMENT "", + `c_city` varchar(11) NOT NULL COMMENT "" + ) + DUPLICATE KEY (`c_custkey`) + DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "compaction_policy" = "time_series", + "time_series_compaction_goal_size_mbytes" = "5" + ); + """ + exception "time_series_compaction_goal_size_mbytes can not be less than 10: 5" + } + + test { + sql """ + CREATE TABLE ${tableName} ( + `c_custkey` int(11) NOT NULL COMMENT "", + `c_name` varchar(26) NOT NULL COMMENT "", + `c_address` varchar(41) NOT NULL COMMENT "", + `c_city` varchar(11) NOT NULL COMMENT "" + ) + DUPLICATE KEY (`c_custkey`) + DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "compaction_policy" = "time_series", + "time_series_compaction_file_count_threshold" = "5" + ); + """ + exception "time_series_compaction_file_count_threshold can not be less than 10: 5" + } + + test { + sql """ + CREATE TABLE ${tableName} ( + `c_custkey` int(11) NOT NULL COMMENT "", + `c_name` varchar(26) NOT NULL COMMENT "", + `c_address` varchar(41) NOT NULL COMMENT "", + `c_city` varchar(11) NOT NULL COMMENT "" + ) + DUPLICATE KEY (`c_custkey`) + DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "compaction_policy" = "time_series", + "time_series_compaction_time_threshold_seconds" = "5" + ); + """ + exception "time_series_compaction_time_threshold_seconds can not be less than 60: 5" + } + + test { + sql """ + CREATE TABLE ${tableName} ( + `c_custkey` int(11) NOT NULL COMMENT "", + `c_name` varchar(26) NOT NULL COMMENT "", + `c_address` varchar(41) NOT NULL COMMENT "", + `c_city` varchar(11) NOT NULL COMMENT "" + ) + DUPLICATE KEY (`c_custkey`) + DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "compaction_policy" = "ok" + ); + """ + exception "compaction_policy must be time_series or size_based" + } + + test { + sql """ + CREATE TABLE ${tableName} ( + `c_custkey` int(11) NOT NULL COMMENT "", + `c_name` varchar(26) NOT NULL COMMENT "", + `c_address` varchar(41) NOT NULL COMMENT "", + `c_city` varchar(11) NOT NULL COMMENT "" + ) + DUPLICATE KEY (`c_custkey`) + DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "time_series_compaction_goal_size_mbytes" = "2048" + ); + """ + exception "only time series compaction policy support for time series config" + } + + test { + sql """ + CREATE TABLE ${tableName} ( + `c_custkey` int(11) NOT NULL COMMENT "", + `c_name` varchar(26) NOT NULL COMMENT "", + `c_address` varchar(41) NOT NULL COMMENT "", + `c_city` varchar(11) NOT NULL COMMENT "" + ) + DUPLICATE KEY (`c_custkey`) + DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql """ + alter table ${tableName} set ("compaction_policy" = "ok") + """ + exception "Table compaction policy only support for time_series or size_based" + } + sql """ DROP TABLE IF EXISTS ${tableName} """ +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
