This is an automated email from the ASF dual-hosted git repository.
jianliangqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 19d1f49fbe [improvement](compaction) compaction policy and options in
the properties of a table (#22461)
19d1f49fbe is described below
commit 19d1f49fbeccb90fe457d10bd425af3ebec558fb
Author: Chenyang Sun <[email protected]>
AuthorDate: Tue Aug 1 22:02:23 2023 +0800
[improvement](compaction) compaction policy and options in the properties
of a table (#22461)
---
be/src/agent/task_worker_pool.cpp | 41 ++++
be/src/common/config.cpp | 14 --
be/src/common/config.h | 11 --
be/src/http/action/compaction_action.cpp | 4 +-
be/src/olap/compaction.cpp | 6 +-
be/src/olap/cumulative_compaction_policy.cpp | 7 +-
be/src/olap/cumulative_compaction_policy.h | 5 +-
.../cumulative_compaction_time_series_policy.cpp | 27 +--
.../cumulative_compaction_time_series_policy.h | 8 +-
be/src/olap/olap_server.cpp | 22 ++-
be/src/olap/storage_engine.h | 4 +-
be/src/olap/tablet.cpp | 5 +-
be/src/olap/tablet_manager.cpp | 7 +-
be/src/olap/tablet_manager.h | 3 +-
be/src/olap/tablet_meta.cpp | 49 ++++-
be/src/olap/tablet_meta.h | 35 +++-
be/test/olap/cumulative_compaction_policy_test.cpp | 6 +-
...mulative_compaction_time_series_policy_test.cpp | 28 +--
docs/en/docs/admin-manual/config/be-config.md | 27 ---
.../Create/CREATE-TABLE.md | 31 +++
docs/zh-CN/docs/admin-manual/config/be-config.md | 27 ---
.../Create/CREATE-TABLE.md | 30 +++
.../main/java/org/apache/doris/alter/Alter.java | 30 ++-
.../org/apache/doris/alter/AlterOperations.java | 54 ++++++
.../java/org/apache/doris/alter/RollupJobV2.java | 4 +
.../apache/doris/alter/SchemaChangeHandler.java | 119 +++++++++++-
.../org/apache/doris/alter/SchemaChangeJobV2.java | 4 +
.../analysis/ModifyTablePropertiesClause.java | 99 ++++++++++
.../java/org/apache/doris/backup/RestoreJob.java | 4 +
.../main/java/org/apache/doris/catalog/Env.java | 31 +++
.../java/org/apache/doris/catalog/OlapTable.java | 56 ++++++
.../org/apache/doris/catalog/TableProperty.java | 60 ++++++
.../apache/doris/common/util/PropertyAnalyzer.java | 108 +++++++++++
.../apache/doris/datasource/InternalCatalog.java | 89 ++++++++-
.../org/apache/doris/master/ReportHandler.java | 5 +-
.../org/apache/doris/task/CreateReplicaTask.java | 21 ++-
.../doris/task/UpdateTabletMetaInfoTask.java | 40 ++++
.../java/org/apache/doris/task/AgentTaskTest.java | 2 +-
gensrc/proto/olap_file.proto | 4 +
gensrc/thrift/AgentService.thrift | 8 +
.../test_table_level_compaction_policy.groovy | 207 +++++++++++++++++++++
41 files changed, 1198 insertions(+), 144 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 6fa6c88197..de578367a9 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -433,6 +433,47 @@ void
TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
tablet->tablet_schema_unlocked()->set_is_in_memory(tablet_meta_info.is_in_memory);
need_to_save = true;
}
+ if (tablet_meta_info.__isset.compaction_policy) {
+ if (tablet_meta_info.compaction_policy != "size_based" &&
+ tablet_meta_info.compaction_policy != "time_series") {
+ status = Status::InvalidArgument(
+ "invalid compaction policy, only support for
size_based or "
+ "time_series");
+ continue;
+ }
+
tablet->tablet_meta()->set_compaction_policy(tablet_meta_info.compaction_policy);
+ need_to_save = true;
+ }
+ if
(tablet_meta_info.__isset.time_series_compaction_goal_size_mbytes) {
+ if (tablet->tablet_meta()->compaction_policy() !=
"time_series") {
+ status = Status::InvalidArgument(
+ "only time series compaction policy support time
series config");
+ continue;
+ }
+
tablet->tablet_meta()->set_time_series_compaction_goal_size_mbytes(
+
tablet_meta_info.time_series_compaction_goal_size_mbytes);
+ need_to_save = true;
+ }
+ if
(tablet_meta_info.__isset.time_series_compaction_file_count_threshold) {
+ if (tablet->tablet_meta()->compaction_policy() !=
"time_series") {
+ status = Status::InvalidArgument(
+ "only time series compaction policy support time
series config");
+ continue;
+ }
+
tablet->tablet_meta()->set_time_series_compaction_file_count_threshold(
+
tablet_meta_info.time_series_compaction_file_count_threshold);
+ need_to_save = true;
+ }
+ if
(tablet_meta_info.__isset.time_series_compaction_time_threshold_seconds) {
+ if (tablet->tablet_meta()->compaction_policy() !=
"time_series") {
+ status = Status::InvalidArgument(
+ "only time series compaction policy support time
series config");
+ continue;
+ }
+
tablet->tablet_meta()->set_time_series_compaction_time_threshold_seconds(
+
tablet_meta_info.time_series_compaction_time_threshold_seconds);
+ need_to_save = true;
+ }
if (tablet_meta_info.__isset.replica_id) {
tablet->tablet_meta()->set_replica_id(tablet_meta_info.replica_id);
}
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 904d575df2..8cf6d75c4e 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -977,20 +977,6 @@ DEFINE_Int32(num_broadcast_buffer, "32");
// semi-structure configs
DEFINE_Bool(enable_parse_multi_dimession_array, "false");
-// Currently, two compaction strategies are implemented, SIZE_BASED and
TIME_SERIES.
-// In the case of time series compaction, the execution of compaction is
adjusted
-// using parameters that have the prefix time_series_compaction.
-DEFINE_mString(compaction_policy, "size_based");
-DEFINE_Validator(compaction_policy, [](const std::string config) -> bool {
- return config == "size_based" || config == "time_series";
-});
-// the size of input files for each compaction
-DEFINE_mInt64(time_series_compaction_goal_size_mbytes, "512");
-// the minimum number of input files for each compaction if
time_series_compaction_goal_size_mbytes not meets
-DEFINE_mInt64(time_series_compaction_file_count_threshold, "2000");
-// if compaction has not been performed within 3600 seconds, a compaction will
be triggered
-DEFINE_mInt64(time_series_compaction_time_threshold_seconds, "3600");
-
// max depth of expression tree allowed.
DEFINE_Int32(max_depth_of_expr_tree, "600");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 466a26a69b..097c1f02bc 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1010,17 +1010,6 @@ DECLARE_Int32(num_broadcast_buffer);
// semi-structure configs
DECLARE_Bool(enable_parse_multi_dimession_array);
-// Currently, two compaction strategies are implemented, SIZE_BASED and
TIME_SERIES.
-// In the case of time series compaction, the execution of compaction is
adjusted
-// using parameters that have the prefix time_series_compaction.
-DECLARE_mString(compaction_policy);
-// the size of input files for each compaction
-DECLARE_mInt64(time_series_compaction_goal_size_mbytes);
-// the minimum number of input files for each compaction if
time_series_compaction_goal_size_mbytes not meets
-DECLARE_mInt64(time_series_compaction_file_count_threshold);
-// if compaction has not been performed within 3600 seconds, a compaction will
be triggered
-DECLARE_mInt64(time_series_compaction_time_threshold_seconds);
-
// max depth of expression tree allowed.
DECLARE_Int32(max_depth_of_expr_tree);
diff --git a/be/src/http/action/compaction_action.cpp
b/be/src/http/action/compaction_action.cpp
index 81f6ad4d30..d94ca60432 100644
--- a/be/src/http/action/compaction_action.cpp
+++ b/be/src/http/action/compaction_action.cpp
@@ -37,6 +37,7 @@
#include "olap/base_compaction.h"
#include "olap/cumulative_compaction.h"
#include "olap/cumulative_compaction_policy.h"
+#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/full_compaction.h"
#include "olap/olap_define.h"
#include "olap/storage_engine.h"
@@ -198,7 +199,8 @@ Status
CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet,
timer.start();
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy =
-
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy();
+
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
+ tablet->tablet_meta()->compaction_policy());
if (tablet->get_cumulative_compaction_policy() == nullptr) {
tablet->set_cumulative_compaction_policy(cumulative_compaction_policy);
}
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index d67248919f..2f24fbd071 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -148,8 +148,10 @@ int64_t Compaction::get_avg_segment_rows() {
// input_rowsets_size is total disk_size of input_rowset, this size is the
// final size after codec and compress, so expect dest segment file size
// in disk is config::vertical_compaction_max_segment_size
- if (config::compaction_policy == CUMULATIVE_TIME_SERIES_POLICY) {
- return (config::time_series_compaction_goal_size_mbytes * 1024 * 1024
* 2) /
+ const auto& meta = _tablet->tablet_meta();
+ if (meta->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY) {
+ int64_t compaction_goal_size_mbytes =
meta->time_series_compaction_goal_size_mbytes();
+ return (compaction_goal_size_mbytes * 1024 * 1024 * 2) /
(_input_rowsets_size / (_input_row_num + 1) + 1);
}
return config::vertical_compaction_max_segment_size /
diff --git a/be/src/olap/cumulative_compaction_policy.cpp
b/be/src/olap/cumulative_compaction_policy.cpp
index 17773e831e..c3d5047cec 100644
--- a/be/src/olap/cumulative_compaction_policy.cpp
+++ b/be/src/olap/cumulative_compaction_policy.cpp
@@ -368,9 +368,12 @@ int64_t
SizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size) {
}
std::shared_ptr<CumulativeCompactionPolicy>
-CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy() {
- if (config::compaction_policy == CUMULATIVE_TIME_SERIES_POLICY) {
+CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
+ const std::string_view& compaction_policy) {
+ if (compaction_policy == CUMULATIVE_TIME_SERIES_POLICY) {
return std::make_shared<TimeSeriesCumulativeCompactionPolicy>();
+ } else if (compaction_policy == CUMULATIVE_SIZE_BASED_POLICY) {
+ return std::make_shared<SizeBasedCumulativeCompactionPolicy>();
}
return std::make_shared<SizeBasedCumulativeCompactionPolicy>();
}
diff --git a/be/src/olap/cumulative_compaction_policy.h
b/be/src/olap/cumulative_compaction_policy.h
index 65f970e4af..836100903b 100644
--- a/be/src/olap/cumulative_compaction_policy.h
+++ b/be/src/olap/cumulative_compaction_policy.h
@@ -33,7 +33,7 @@ namespace doris {
class Tablet;
struct Version;
-inline std::string_view CUMULATIVE_SIZE_BASED_POLICY = "size_based";
+inline constexpr std::string_view CUMULATIVE_SIZE_BASED_POLICY = "size_based";
/// This class CumulativeCompactionPolicy is the base class of cumulative
compaction policy.
/// It defines the policy to do cumulative compaction. It has different
derived classes, which implements
@@ -176,7 +176,8 @@ class CumulativeCompactionPolicyFactory {
public:
/// Static factory function. It can product different policy according to
the `policy` parameter and use tablet ptr
/// to construct the policy. Now it can product size based and num based
policies.
- static std::shared_ptr<CumulativeCompactionPolicy>
create_cumulative_compaction_policy();
+ static std::shared_ptr<CumulativeCompactionPolicy>
create_cumulative_compaction_policy(
+ const std::string_view& compaction_policy);
};
} // namespace doris
diff --git a/be/src/olap/cumulative_compaction_time_series_policy.cpp
b/be/src/olap/cumulative_compaction_time_series_policy.cpp
index ac816d0e54..1e144b6a2c 100644
--- a/be/src/olap/cumulative_compaction_time_series_policy.cpp
+++ b/be/src/olap/cumulative_compaction_time_series_policy.cpp
@@ -63,13 +63,15 @@ uint32_t
TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
return 0;
}
- // Condition 1: the size of input files for compaction meets the
requirement of parameter _compaction_goal_size
- if (total_size >= (config::time_series_compaction_goal_size_mbytes * 1024
* 1024)) {
+ // Condition 1: the size of input files for compaction meets the
requirement of parameter compaction_goal_size
+ int64_t compaction_goal_size_mbytes =
+ tablet->tablet_meta()->time_series_compaction_goal_size_mbytes();
+ if (total_size >= compaction_goal_size_mbytes * 1024 * 1024) {
return score;
}
- // Condition 2: the number of input files reaches the threshold specified
by parameter _compaction_file_count_threshold
- if (score >= config::time_series_compaction_file_count_threshold) {
+ // Condition 2: the number of input files reaches the threshold specified
by parameter compaction_file_count_threshold
+ if (score >=
tablet->tablet_meta()->time_series_compaction_file_count_threshold()) {
return score;
}
@@ -79,7 +81,8 @@ uint32_t
TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
int64_t cumu_interval = now - last_cumu;
// Condition 3: the time interval between compactions exceeds the
value specified by parameter _compaction_time_threshold_second
- if (cumu_interval >
(config::time_series_compaction_time_threshold_seconds * 1000)) {
+ if (cumu_interval >
+
(tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() *
1000)) {
return score;
}
} else if (score > 0) {
@@ -199,8 +202,9 @@ int
TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
transient_size += 1;
input_rowsets->push_back(rowset);
- // Condition 1: the size of input files for compaction meets the
requirement of parameter _compaction_goal_size
- if (total_size >= (config::time_series_compaction_goal_size_mbytes *
1024 * 1024)) {
+ // Condition 1: the size of input files for compaction meets the
requirement of parameter compaction_goal_size
+ if (total_size >=
+ (tablet->tablet_meta()->time_series_compaction_goal_size_mbytes()
* 1024 * 1024)) {
if (input_rowsets->size() == 1 &&
!input_rowsets->front()->rowset_meta()->is_segments_overlapping()) {
// Only 1 non-overlapping rowset, skip it
@@ -225,8 +229,8 @@ int
TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
return transient_size;
}
- // Condition 2: the number of input files reaches the threshold specified
by parameter _compaction_file_count_threshold
- if (*compaction_score >=
config::time_series_compaction_file_count_threshold) {
+ // Condition 2: the number of input files reaches the threshold specified
by parameter compaction_file_count_threshold
+ if (*compaction_score >=
tablet->tablet_meta()->time_series_compaction_file_count_threshold()) {
return transient_size;
}
@@ -235,8 +239,9 @@ int
TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
if (last_cumu != 0) {
int64_t cumu_interval = now - last_cumu;
- // Condition 3: the time interval between compactions exceeds the
value specified by parameter _compaction_time_threshold_second
- if (cumu_interval >
(config::time_series_compaction_time_threshold_seconds * 1000)) {
+ // Condition 3: the time interval between compactions exceeds the
value specified by parameter compaction_time_threshold_second
+ if (cumu_interval >
+
(tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() *
1000)) {
return transient_size;
}
}
diff --git a/be/src/olap/cumulative_compaction_time_series_policy.h
b/be/src/olap/cumulative_compaction_time_series_policy.h
index 3fc3362fa8..1bae5ecfb1 100644
--- a/be/src/olap/cumulative_compaction_time_series_policy.h
+++ b/be/src/olap/cumulative_compaction_time_series_policy.h
@@ -21,13 +21,13 @@
namespace doris {
-inline std::string_view CUMULATIVE_TIME_SERIES_POLICY = "time_series";
+inline constexpr std::string_view CUMULATIVE_TIME_SERIES_POLICY =
"time_series";
/// TimeSeries cumulative compaction policy implementation.
/// The following three conditions will be considered when calculating
compaction scores and selecting input rowsets in this policy:
-/// Condition 1: the size of input files for compaction meets the requirement
of parameter _compaction_goal_size
-/// Condition 2: the number of input files reaches the threshold specified by
parameter _compaction_file_count_threshold
-/// Condition 3: the time interval between compactions exceeds the value
specified by parameter _compaction_time_threshold_seconds
+/// Condition 1: the size of input files for compaction meets the requirement
of time_series_compaction_goal_size_mbytes
+/// Condition 2: the number of input files reaches the threshold specified by
time_series_compaction_file_count_threshold
+/// Condition 3: the time interval between compactions exceeds the value
specified by time_series_compaction_time_threshold_seconds
/// The conditions are evaluated sequentially, starting with Condition 1.
/// If any condition is met, the compaction score calculation or selection of
input rowsets will be successful.
class TimeSeriesCumulativeCompactionPolicy final : public
CumulativeCompactionPolicy {
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 44ec7304e5..afb644d39e 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -52,6 +52,7 @@
#include "olap/cold_data_compaction.h"
#include "olap/compaction_permit_limiter.h"
#include "olap/cumulative_compaction_policy.h"
+#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/data_dir.h"
#include "olap/olap_common.h"
#include "olap/rowset/segcompaction.h"
@@ -833,7 +834,7 @@ std::vector<TabletSharedPtr>
StorageEngine::_generate_compaction_tasks(
compaction_type == CompactionType::CUMULATIVE_COMPACTION
? copied_cumu_map[data_dir]
: copied_base_map[data_dir],
- &disk_max_score, _cumulative_compaction_policy);
+ &disk_max_score, _cumulative_compaction_policies);
if (tablet != nullptr) {
if
(!tablet->tablet_meta()->tablet_schema()->disable_auto_compaction()) {
if (need_pick_tablet) {
@@ -863,9 +864,13 @@ std::vector<TabletSharedPtr>
StorageEngine::_generate_compaction_tasks(
}
void StorageEngine::_update_cumulative_compaction_policy() {
- if (_cumulative_compaction_policy == nullptr) {
- _cumulative_compaction_policy =
-
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy();
+ if (_cumulative_compaction_policies.empty()) {
+ _cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY] =
+
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
+ CUMULATIVE_SIZE_BASED_POLICY);
+ _cumulative_compaction_policies[CUMULATIVE_TIME_SERIES_POLICY] =
+
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
+ CUMULATIVE_TIME_SERIES_POLICY);
}
}
@@ -975,8 +980,13 @@ Status
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet,
CompactionType compaction_type,
bool force) {
_update_cumulative_compaction_policy();
- if (tablet->get_cumulative_compaction_policy() == nullptr) {
-
tablet->set_cumulative_compaction_policy(_cumulative_compaction_policy);
+ // alter table tableName set ("compaction_policy"="time_series")
+ // if atler table's compaction policy, we need to modify tablet
compaction policy shared ptr
+ if (tablet->get_cumulative_compaction_policy() == nullptr ||
+ tablet->get_cumulative_compaction_policy()->name() !=
+ tablet->tablet_meta()->compaction_policy()) {
+ tablet->set_cumulative_compaction_policy(
+
_cumulative_compaction_policies.at(tablet->tablet_meta()->compaction_policy()));
}
tablet->set_skip_compaction(false);
return _submit_compaction_task(tablet, compaction_type, force);
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index e1dfee1d79..274ba19ea9 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -460,7 +460,9 @@ private:
std::shared_ptr<StreamLoadRecorder> _stream_load_recorder;
- std::shared_ptr<CumulativeCompactionPolicy> _cumulative_compaction_policy;
+ // we use unordered_map to store all cumulative compaction policy sharded
ptr
+ std::unordered_map<std::string_view,
std::shared_ptr<CumulativeCompactionPolicy>>
+ _cumulative_compaction_policies;
scoped_refptr<Thread> _cooldown_tasks_producer_thread;
scoped_refptr<Thread> _remove_unused_remote_files_thread;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 9348f78119..5ad13feff1 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -284,7 +284,8 @@ Status Tablet::_init_once_action() {
#ifdef BE_TEST
// init cumulative compaction policy by type
_cumulative_compaction_policy =
-
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy();
+
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
+ _tablet_meta->compaction_policy());
#endif
RowsetVector rowset_vec;
@@ -1062,7 +1063,7 @@ uint32_t Tablet::_calc_base_compaction_score() const {
// In the time series compaction policy, we want the base compaction to be
triggered
// when there are delete versions present.
- if (config::compaction_policy == CUMULATIVE_TIME_SERIES_POLICY) {
+ if (_tablet_meta->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY) {
return (base_rowset_exist && has_delete) ? score : 0;
}
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 8b68e62406..36d19cf8ed 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -40,6 +40,7 @@
#include "gutil/strings/strcat.h"
#include "gutil/strings/substitute.h"
#include "io/fs/local_file_system.h"
+#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/data_dir.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
@@ -678,7 +679,8 @@ void TabletManager::get_tablet_stat(TTabletStatResult*
result) {
TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
CompactionType compaction_type, DataDir* data_dir,
const std::unordered_set<TTabletId>& tablet_submitted_compaction,
uint32_t* score,
- std::shared_ptr<CumulativeCompactionPolicy>
cumulative_compaction_policy) {
+ const std::unordered_map<std::string_view,
std::shared_ptr<CumulativeCompactionPolicy>>&
+ all_cumulative_compaction_policies) {
int64_t now_ms = UnixMillis();
const string& compaction_type_str =
compaction_type == CompactionType::BASE_COMPACTION ? "base" :
"cumulative";
@@ -729,7 +731,8 @@ TabletSharedPtr
TabletManager::find_best_tablet_to_compaction(
continue;
}
}
-
+ auto cumulative_compaction_policy =
all_cumulative_compaction_policies.at(
+ tablet_ptr->tablet_meta()->compaction_policy());
uint32_t current_compaction_score =
tablet_ptr->calc_compaction_score(
compaction_type, cumulative_compaction_policy);
if (current_compaction_score < 5) {
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index f7f41dcef1..2371da868a 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -77,7 +77,8 @@ public:
TabletSharedPtr find_best_tablet_to_compaction(
CompactionType compaction_type, DataDir* data_dir,
const std::unordered_set<TTabletId>& tablet_submitted_compaction,
uint32_t* score,
- std::shared_ptr<CumulativeCompactionPolicy>
cumulative_compaction_policy);
+ const std::unordered_map<std::string_view,
std::shared_ptr<CumulativeCompactionPolicy>>&
+ all_cumulative_compaction_policies);
TabletSharedPtr get_tablet(TTabletId tablet_id, bool include_deleted =
false,
std::string* err = nullptr);
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 01d206ebe9..612c50280d 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -65,7 +65,10 @@ Status TabletMeta::create(const TCreateTabletReq& request,
const TabletUid& tabl
request.__isset.enable_unique_key_merge_on_write
? request.enable_unique_key_merge_on_write
: false,
- std::move(binlog_config));
+ std::move(binlog_config), request.compaction_policy,
+ request.time_series_compaction_goal_size_mbytes,
+ request.time_series_compaction_file_count_threshold,
+ request.time_series_compaction_time_threshold_seconds);
return Status::OK();
}
@@ -81,7 +84,10 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t
partition_id, int64_t tablet_id
TabletUid tablet_uid, TTabletType::type tabletType,
TCompressionType::type compression_type, int64_t
storage_policy_id,
bool enable_unique_key_merge_on_write,
- std::optional<TBinlogConfig> binlog_config)
+ std::optional<TBinlogConfig> binlog_config, std::string
compaction_policy,
+ int64_t time_series_compaction_goal_size_mbytes,
+ int64_t time_series_compaction_file_count_threshold,
+ int64_t time_series_compaction_time_threshold_seconds)
: _tablet_uid(0, 0),
_schema(new TabletSchema),
_delete_bitmap(new DeleteBitmap(tablet_id)) {
@@ -102,6 +108,13 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t
partition_id, int64_t tablet_id
: TabletTypePB::TABLET_TYPE_MEMORY);
tablet_meta_pb.set_enable_unique_key_merge_on_write(enable_unique_key_merge_on_write);
tablet_meta_pb.set_storage_policy_id(storage_policy_id);
+ tablet_meta_pb.set_compaction_policy(compaction_policy);
+ tablet_meta_pb.set_time_series_compaction_goal_size_mbytes(
+ time_series_compaction_goal_size_mbytes);
+ tablet_meta_pb.set_time_series_compaction_file_count_threshold(
+ time_series_compaction_file_count_threshold);
+ tablet_meta_pb.set_time_series_compaction_time_threshold_seconds(
+ time_series_compaction_time_threshold_seconds);
TabletSchemaPB* schema = tablet_meta_pb.mutable_schema();
schema->set_num_short_key_columns(tablet_schema.short_key_column_count);
schema->set_num_rows_per_row_block(config::default_num_rows_per_column_file_block);
@@ -266,7 +279,6 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t
partition_id, int64_t tablet_id
if (tablet_schema.__isset.skip_write_index_on_load) {
schema->set_skip_write_index_on_load(tablet_schema.skip_write_index_on_load);
}
-
if (binlog_config.has_value()) {
BinlogConfig tmp_binlog_config;
tmp_binlog_config = binlog_config.value();
@@ -298,7 +310,13 @@ TabletMeta::TabletMeta(const TabletMeta& b)
_cooldown_meta_id(b._cooldown_meta_id),
_enable_unique_key_merge_on_write(b._enable_unique_key_merge_on_write),
_delete_bitmap(b._delete_bitmap),
- _binlog_config(b._binlog_config) {};
+ _binlog_config(b._binlog_config),
+ _compaction_policy(b._compaction_policy),
+
_time_series_compaction_goal_size_mbytes(b._time_series_compaction_goal_size_mbytes),
+ _time_series_compaction_file_count_threshold(
+ b._time_series_compaction_file_count_threshold),
+ _time_series_compaction_time_threshold_seconds(
+ b._time_series_compaction_time_threshold_seconds) {};
void TabletMeta::init_column_from_tcolumn(uint32_t unique_id, const TColumn&
tcolumn,
ColumnPB* column) {
@@ -562,6 +580,13 @@ void TabletMeta::init_from_pb(const TabletMetaPB&
tablet_meta_pb) {
if (tablet_meta_pb.has_binlog_config()) {
_binlog_config = tablet_meta_pb.binlog_config();
}
+ _compaction_policy = tablet_meta_pb.compaction_policy();
+ _time_series_compaction_goal_size_mbytes =
+ tablet_meta_pb.time_series_compaction_goal_size_mbytes();
+ _time_series_compaction_file_count_threshold =
+ tablet_meta_pb.time_series_compaction_file_count_threshold();
+ _time_series_compaction_time_threshold_seconds =
+ tablet_meta_pb.time_series_compaction_time_threshold_seconds();
}
void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
@@ -636,6 +661,13 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
}
}
_binlog_config.to_pb(tablet_meta_pb->mutable_binlog_config());
+ tablet_meta_pb->set_compaction_policy(compaction_policy());
+ tablet_meta_pb->set_time_series_compaction_goal_size_mbytes(
+ time_series_compaction_goal_size_mbytes());
+ tablet_meta_pb->set_time_series_compaction_file_count_threshold(
+ time_series_compaction_file_count_threshold());
+ tablet_meta_pb->set_time_series_compaction_time_threshold_seconds(
+ time_series_compaction_time_threshold_seconds());
}
uint32_t TabletMeta::mem_size() const {
@@ -860,6 +892,15 @@ bool operator==(const TabletMeta& a, const TabletMeta& b) {
if (a._in_restore_mode != b._in_restore_mode) return false;
if (a._preferred_rowset_type != b._preferred_rowset_type) return false;
if (a._storage_policy_id != b._storage_policy_id) return false;
+ if (a._compaction_policy != b._compaction_policy) return false;
+ if (a._time_series_compaction_goal_size_mbytes !=
b._time_series_compaction_goal_size_mbytes)
+ return false;
+ if (a._time_series_compaction_file_count_threshold !=
+ b._time_series_compaction_file_count_threshold)
+ return false;
+ if (a._time_series_compaction_time_threshold_seconds !=
+ b._time_series_compaction_time_threshold_seconds)
+ return false;
return true;
}
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 70830b82de..c9ddff2417 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -106,7 +106,11 @@ public:
TabletUid tablet_uid, TTabletType::type tabletType,
TCompressionType::type compression_type, int64_t
storage_policy_id = 0,
bool enable_unique_key_merge_on_write = false,
- std::optional<TBinlogConfig> binlog_config = {});
+ std::optional<TBinlogConfig> binlog_config = {},
+ std::string compaction_policy = "size_based",
+ int64_t time_series_compaction_goal_size_mbytes = 1024,
+ int64_t time_series_compaction_file_count_threshold = 2000,
+ int64_t time_series_compaction_time_threshold_seconds = 3600);
// If need add a filed in TableMeta, filed init copy in copy construct
function
TabletMeta(const TabletMeta& tablet_meta);
TabletMeta(TabletMeta&& tablet_meta) = delete;
@@ -228,6 +232,29 @@ public:
_binlog_config = std::move(binlog_config);
}
+ void set_compaction_policy(std::string compaction_policy) {
+ _compaction_policy = compaction_policy;
+ }
+ std::string compaction_policy() const { return _compaction_policy; }
+ void set_time_series_compaction_goal_size_mbytes(int64_t goal_size_mbytes)
{
+ _time_series_compaction_goal_size_mbytes = goal_size_mbytes;
+ }
+ int64_t time_series_compaction_goal_size_mbytes() const {
+ return _time_series_compaction_goal_size_mbytes;
+ }
+ void set_time_series_compaction_file_count_threshold(int64_t
file_count_threshold) {
+ _time_series_compaction_file_count_threshold = file_count_threshold;
+ }
+ int64_t time_series_compaction_file_count_threshold() const {
+ return _time_series_compaction_file_count_threshold;
+ }
+ void set_time_series_compaction_time_threshold_seconds(int64_t
time_threshold) {
+ _time_series_compaction_time_threshold_seconds = time_threshold;
+ }
+ int64_t time_series_compaction_time_threshold_seconds() const {
+ return _time_series_compaction_time_threshold_seconds;
+ }
+
private:
Status _save_meta(DataDir* data_dir);
@@ -274,6 +301,12 @@ private:
// binlog config
BinlogConfig _binlog_config {};
+ // meta for compaction
+ std::string _compaction_policy;
+ int64_t _time_series_compaction_goal_size_mbytes = 0;
+ int64_t _time_series_compaction_file_count_threshold = 0;
+ int64_t _time_series_compaction_time_threshold_seconds = 0;
+
mutable std::shared_mutex _meta_lock;
};
diff --git a/be/test/olap/cumulative_compaction_policy_test.cpp
b/be/test/olap/cumulative_compaction_policy_test.cpp
index 83b779ceb1..dc14907e3e 100644
--- a/be/test/olap/cumulative_compaction_policy_test.cpp
+++ b/be/test/olap/cumulative_compaction_policy_test.cpp
@@ -340,7 +340,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy,
calc_cumulative_compaction_score
_tablet->calculate_cumulative_point();
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy =
-
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy();
+
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
+ CUMULATIVE_SIZE_BASED_POLICY);
const uint32_t score =
_tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION,
cumulative_compaction_policy);
@@ -359,7 +360,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy,
calc_cumulative_compaction_score
_tablet->init();
_tablet->calculate_cumulative_point();
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy =
-
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy();
+
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
+ CUMULATIVE_SIZE_BASED_POLICY);
const uint32_t score =
_tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION,
cumulative_compaction_policy);
diff --git a/be/test/olap/cumulative_compaction_time_series_policy_test.cpp
b/be/test/olap/cumulative_compaction_time_series_policy_test.cpp
index 74bcbe70ac..f9b9117233 100644
--- a/be/test/olap/cumulative_compaction_time_series_policy_test.cpp
+++ b/be/test/olap/cumulative_compaction_time_series_policy_test.cpp
@@ -35,17 +35,17 @@ namespace doris {
class TestTimeSeriesCumulativeCompactionPolicy : public testing::Test {
public:
- TestTimeSeriesCumulativeCompactionPolicy() {}
+ TestTimeSeriesCumulativeCompactionPolicy() = default;
void SetUp() {
- config::compaction_policy = "time_series";
- config::time_series_compaction_goal_size_mbytes = 1024;
- config::time_series_compaction_file_count_threshold = 10;
- config::time_series_compaction_time_threshold_seconds = 3600;
-
_tablet_meta = static_cast<TabletMetaSharedPtr>(new TabletMeta(
1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}},
UniqueId(9, 10),
TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F));
+
_tablet_meta->set_compaction_policy(std::string(CUMULATIVE_TIME_SERIES_POLICY));
+ _tablet_meta->set_time_series_compaction_goal_size_mbytes(100);
+ _tablet_meta->set_time_series_compaction_file_count_threshold(10);
+ _tablet_meta->set_time_series_compaction_time_threshold_seconds(3600);
+
_json_rowset_meta = R"({
"rowset_id": 540081,
"tablet_id": 15673,
@@ -106,28 +106,28 @@ public:
void init_rs_meta_big_rowset(std::vector<RowsetMetaSharedPtr>* rs_metas) {
RowsetMetaSharedPtr ptr1(new RowsetMeta());
init_rs_meta(ptr1, 0, 1);
- ptr1->set_total_disk_size(1024 * 1024 * 1024);
+ ptr1->set_total_disk_size(100 * 1024 * 1024);
rs_metas->push_back(ptr1);
RowsetMetaSharedPtr ptr2(new RowsetMeta());
init_rs_meta(ptr2, 2, 3);
- ptr2->set_total_disk_size(1024 * 1024 * 1024);
+ ptr2->set_total_disk_size(100 * 1024 * 1024);
rs_metas->push_back(ptr2);
RowsetMetaSharedPtr ptr3(new RowsetMeta());
init_rs_meta(ptr3, 4, 4);
- ptr3->set_total_disk_size(512 * 1024 * 1024);
+ ptr3->set_total_disk_size(51 * 1024 * 1024);
rs_metas->push_back(ptr3);
RowsetMetaSharedPtr ptr4(new RowsetMeta());
init_rs_meta(ptr4, 5, 5);
ptr4->set_segments_overlap(OVERLAPPING);
- ptr4->set_total_disk_size(512 * 1024 * 1024);
+ ptr4->set_total_disk_size(51 * 1024 * 1024);
rs_metas->push_back(ptr4);
RowsetMetaSharedPtr ptr5(new RowsetMeta());
init_rs_meta(ptr5, 6, 6);
- ptr5->set_total_disk_size(512 * 1024 * 1024);
+ ptr5->set_total_disk_size(51 * 1024 * 1024);
rs_metas->push_back(ptr5);
}
@@ -336,7 +336,8 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy,
calc_cumulative_compaction_scor
_tablet->calculate_cumulative_point();
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy =
-
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy();
+
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
+ CUMULATIVE_TIME_SERIES_POLICY);
const uint32_t score =
_tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION,
cumulative_compaction_policy);
@@ -355,7 +356,8 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy,
calc_cumulative_compaction_scor
_tablet->init();
_tablet->calculate_cumulative_point();
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy =
-
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy();
+
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
+ CUMULATIVE_TIME_SERIES_POLICY);
const uint32_t score =
_tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION,
cumulative_compaction_policy);
diff --git a/docs/en/docs/admin-manual/config/be-config.md
b/docs/en/docs/admin-manual/config/be-config.md
index 9ebfabea57..bfc61c56e9 100644
--- a/docs/en/docs/admin-manual/config/be-config.md
+++ b/docs/en/docs/admin-manual/config/be-config.md
@@ -665,33 +665,6 @@ BaseCompaction:546859:
* Description: Minimal interval (s) to update peer replica infos
* Default value: 60 (s)
-#### `compaction_policy`
-
-* Type: string
-* Description: Configure the compaction strategy in the compression phase.
Currently, two compaction strategies are implemented, size_based and
time_series.
- - size_based: Version merging can only be performed when the disk volume of
the rowset is the same order of magnitude. After merging, qualified rowsets are
promoted to the base compaction stage. In the case of a large number of small
batch imports, it can reduce the write magnification of base compact, balance
the read magnification and space magnification, and reduce the data of file
versions.
- - time_series: When the disk size of a rowset accumulates to a certain
threshold, version merging takes place. The merged rowset is directly promoted
to the base compaction stage. This approach effectively reduces the write
amplification rate of compaction, especially in scenarios with continuous
imports in a time series context.
-* Default value: size_based
-
-#### `time_series_compaction_goal_size_mbytes`
-
-* Type: int64
-* Description: Enabling time series compaction will utilize this parameter to
adjust the size of input files for each compaction. The output file size will
be approximately equal to the input file size.
-* Default value: 512
-
-#### `time_series_compaction_file_count_threshold`
-
-* Type: int64
-* Description: Enabling time series compaction will utilize this parameter to
adjust the minimum number of input files for each compaction. It comes into
effect only when the condition specified by
time_series_compaction_goal_size_mbytes is not met.
- - If the number of files in a tablet exceeds the configured threshold, it
will trigger a compaction process.
-* Default value: 2000
-
-#### `time_series_compaction_time_threshold_seconds`
-
-* Type: int64
-* Description: When time series compaction is enabled, a significant duration
passes without a compaction being executed, a compaction will be triggered.
-* Default value: 3600 (s)
-
### Load
diff --git
a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
index 5152fb4957..909eee28eb 100644
---
a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
+++
b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
@@ -415,6 +415,37 @@ Set table properties. The following attributes are
currently supported:
`"skip_write_index_on_load" = "false"`
+* `compaction_policy`
+
+ Configure the compaction strategy in the compression phase. Only support
configuring the compaction policy as "time_series" or "size_based".
+
+ time_series: When the disk size of a rowset accumulates to a certain
threshold, version merging takes place. The merged rowset is directly promoted
to the base compaction stage. This approach effectively reduces the write
amplification rate of compaction, especially in scenarios with continuous
imports in a time series context.
+
+ In the case of time series compaction, the execution of compaction is
adjusted using parameters that have the prefix time_series_compaction.
+
+ `"compaction_policy" = ""`
+
+* `time_series_compaction_goal_size_mbytes`
+
+ Time series compaction policy will utilize this parameter to adjust the
size of input files for each compaction. The output file size will be
approximately equal to the input file size.
+
+ `"time_series_compaction_goal_size_mbytes" = "1024"`
+
+* `time_series_compaction_file_count_threshold`
+
+ Time series compaction policy will utilize this parameter to adjust the
minimum number of input files for each compaction.
+
+ If the number of files in a tablet exceeds the configured threshold, it
will trigger a compaction process.
+
+ `"time_series_compaction_file_count_threshold" = "2000"`
+
+* `time_series_compaction_time_threshold_seconds`
+
+ When time series compaction policy is applied, a significant duration
passes without a compaction being executed, a compaction will be triggered.
+
+ `"time_series_compaction_time_threshold_seconds" = "3600"`
+
+
* Dynamic partition related
The relevant parameters of dynamic partition are as follows:
diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md
b/docs/zh-CN/docs/admin-manual/config/be-config.md
index 5c3847123b..7a3ac0315a 100644
--- a/docs/zh-CN/docs/admin-manual/config/be-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/be-config.md
@@ -679,33 +679,6 @@ BaseCompaction:546859:
* 描述:更新 peer replica infos 的最小间隔时间
* 默认值:60(s)
-#### `compaction_policy`
-
-* 类型:string
-* 描述:配置 compaction 的合并策略,目前实现了两种合并策略,size_based 和 time_series
- - size_based: 仅当 rowset 的磁盘体积在相同数量级时才进行版本合并。合并之后满足条件的 rowset 进行晋升到 base
compaction阶段。能够做到在大量小批量导入的情况下:降低base
compact的写入放大率,并在读取放大率和空间放大率之间进行权衡,同时减少了文件版本的数据。
- - time_series: 当 rowset 的磁盘体积积攒到一定大小时进行版本合并。合并后的 rowset 直接晋升到 base
compaction 阶段。在时序场景持续导入的情况下有效降低 compact 的写入放大率。
-* 默认值:size_based
-
-#### `time_series_compaction_goal_size_mbytes`
-
-* 类型:int64
-* 描述:开启 time series compaction 时,将使用此参数来调整每次 compaction 输入的文件的大小,输出的文件大小和输入相当
-* 默认值:512
-
-#### `time_series_compaction_file_count_threshold`
-
-* 类型:int64
-* 描述:开启 time series compaction 时,将使用此参数来调整每次 compaction 输入的文件数量的最小值,只有当
time_series_compaction_goal_size_mbytes 条件不满足时,该参数才会发挥作用
- - 一个 tablet 中文件数超过该配置,会触发 compaction
-* 默认值:2000
-
-#### `time_series_compaction_time_threshold_seconds`
-
-* 类型:int64
-* 描述:开启 time series compaction 时,将使用此参数来调整 compaction 的最长时间间隔,即长时间未执行过
compaction 时,就会触发一次 compaction,单位为秒
-* 默认值:3600
-
### 导入
diff --git
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
index de0d723987..f9fcff5efe 100644
---
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
+++
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
@@ -398,6 +398,36 @@ UNIQUE KEY(k1, k2)
`"skip_write_index_on_load" = "false"`
+* `compaction_policy`
+
+ 配置这个表的 compaction 的合并策略,仅支持配置为 time_series 或者 size_based
+
+ time_series: 当 rowset 的磁盘体积积攒到一定大小时进行版本合并。合并后的 rowset 直接晋升到 base
compaction 阶段。在时序场景持续导入的情况下有效降低 compact 的写入放大率
+
+ 此策略将使用 time_series_compaction 为前缀的参数调整 compaction 的执行
+
+ `"compaction_policy" = ""`
+
+* `time_series_compaction_goal_size_mbytes`
+
+ compaction 的合并策略为 time_series 时,将使用此参数来调整每次 compaction
输入的文件的大小,输出的文件大小和输入相当
+
+ `"time_series_compaction_goal_size_mbytes" = "1024"`
+
+* `time_series_compaction_file_count_threshold`
+
+ compaction 的合并策略为 time_series 时,将使用此参数来调整每次 compaction 输入的文件数量的最小值
+
+ 一个 tablet 中,文件数超过该配置,就会触发 compaction
+
+ `"time_series_compaction_file_count_threshold" = "2000"`
+
+* `time_series_compaction_time_threshold_seconds`
+
+ compaction 的合并策略为 time_series 时,将使用此参数来调整 compaction 的最长时间间隔,即长时间未执行过
compaction 时,就会触发一次 compaction,单位为秒
+
+ `"time_series_compaction_time_threshold_seconds" = "3600"`
+
* 动态分区相关
动态分区相关参数如下:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
index 53eb9052e8..f8c90b8a52 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
@@ -211,6 +211,28 @@ public class Alter {
} else if (currentAlterOps.checkIsBeingSynced(alterClauses)) {
olapTable.setIsBeingSynced(currentAlterOps.isBeingSynced(alterClauses));
needProcessOutsideTableLock = true;
+ } else if (currentAlterOps.checkCompactionPolicy(alterClauses)
+ && currentAlterOps.getCompactionPolicy(alterClauses) !=
olapTable.getCompactionPolicy()) {
+
olapTable.setCompactionPolicy(currentAlterOps.getCompactionPolicy(alterClauses));
+ needProcessOutsideTableLock = true;
+ } else if
(currentAlterOps.checkTimeSeriesCompactionGoalSizeMbytes(alterClauses)
+ &&
currentAlterOps.getTimeSeriesCompactionGoalSizeMbytes(alterClauses)
+ !=
olapTable.getTimeSeriesCompactionGoalSizeMbytes()) {
+ olapTable.setTimeSeriesCompactionGoalSizeMbytes(currentAlterOps
+
.getTimeSeriesCompactionGoalSizeMbytes(alterClauses));
+ needProcessOutsideTableLock = true;
+ } else if
(currentAlterOps.checkTimeSeriesCompactionFileCountThreshold(alterClauses)
+ &&
currentAlterOps.getTimeSeriesCompactionFileCountThreshold(alterClauses)
+ !=
olapTable.getTimeSeriesCompactionFileCountThreshold()) {
+ olapTable.setTimeSeriesCompactionFileCountThreshold(currentAlterOps
+
.getTimeSeriesCompactionFileCountThreshold(alterClauses));
+ needProcessOutsideTableLock = true;
+ } else if
(currentAlterOps.checkTimeSeriesCompactionTimeThresholdSeconds(alterClauses)
+ &&
currentAlterOps.getTimeSeriesCompactionTimeThresholdSeconds(alterClauses)
+ !=
olapTable.getTimeSeriesCompactionTimeThresholdSeconds()) {
+
olapTable.setTimeSeriesCompactionTimeThresholdSeconds(currentAlterOps
+
.getTimeSeriesCompactionTimeThresholdSeconds(alterClauses));
+ needProcessOutsideTableLock = true;
} else if (currentAlterOps.checkBinlogConfigChange(alterClauses)) {
if (!Config.enable_feature_binlog) {
throw new DdlException("Binlog feature is not enabled");
@@ -514,7 +536,13 @@ public class Alter {
// currently, only in memory and storage policy property could
reach here
Preconditions.checkState(properties.containsKey(PropertyAnalyzer.PROPERTIES_INMEMORY)
||
properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY)
- ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED));
+ ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED)
+ ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY)
+ ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)
+ || properties
+
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD)
+ || properties
+
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS));
((SchemaChangeHandler)
schemaChangeHandler).updateTableProperties(db, tableName, properties);
} else {
throw new DdlException("Invalid alter operation: " +
alterClause.getOpType());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterOperations.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterOperations.java
index 22104522ed..62721fa37e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterOperations.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterOperations.java
@@ -93,6 +93,60 @@ public class AlterOperations {
||
clause.getProperties().containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_HISTORY_NUMS));
}
+ public boolean checkCompactionPolicy(List<AlterClause> alterClauses) {
+ return alterClauses.stream().filter(clause ->
+ clause instanceof ModifyTablePropertiesClause
+ ).anyMatch(clause ->
clause.getProperties().containsKey(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY));
+ }
+
+ public String getCompactionPolicy(List<AlterClause> alterClauses) {
+ return alterClauses.stream().filter(clause ->
+ clause instanceof ModifyTablePropertiesClause
+ ).map(c -> ((ModifyTablePropertiesClause)
c).compactionPolicy()).findFirst().orElse("");
+ }
+
+ public boolean checkTimeSeriesCompactionGoalSizeMbytes(List<AlterClause>
alterClauses) {
+ return alterClauses.stream().filter(clause ->
+ clause instanceof ModifyTablePropertiesClause
+ ).anyMatch(clause -> clause.getProperties()
+
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES));
+ }
+
+ public long getTimeSeriesCompactionGoalSizeMbytes(List<AlterClause>
alterClauses) {
+ return alterClauses.stream().filter(clause ->
+ clause instanceof ModifyTablePropertiesClause
+ ).map(c -> ((ModifyTablePropertiesClause) c)
+ .timeSeriesCompactionGoalSizeMbytes()).findFirst().orElse((long) -1);
+ }
+
+ public boolean
checkTimeSeriesCompactionFileCountThreshold(List<AlterClause> alterClauses) {
+ return alterClauses.stream().filter(clause ->
+ clause instanceof ModifyTablePropertiesClause
+ ).anyMatch(clause -> clause.getProperties()
+
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD));
+ }
+
+ public long getTimeSeriesCompactionFileCountThreshold(List<AlterClause>
alterClauses) {
+ return alterClauses.stream().filter(clause ->
+ clause instanceof ModifyTablePropertiesClause
+ ).map(c -> ((ModifyTablePropertiesClause) c)
+ .timeSeriesCompactionFileCountThreshold()).findFirst().orElse((long)
-1);
+ }
+
+ public boolean
checkTimeSeriesCompactionTimeThresholdSeconds(List<AlterClause> alterClauses) {
+ return alterClauses.stream().filter(clause ->
+ clause instanceof ModifyTablePropertiesClause
+ ).anyMatch(clause -> clause.getProperties()
+
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS));
+ }
+
+ public long getTimeSeriesCompactionTimeThresholdSeconds(List<AlterClause>
alterClauses) {
+ return alterClauses.stream().filter(clause ->
+ clause instanceof ModifyTablePropertiesClause
+ ).map(c -> ((ModifyTablePropertiesClause) c)
+ .timeSeriesCompactionTimeThresholdSeconds()).findFirst().orElse((long)
-1);
+ }
+
public boolean isBeingSynced(List<AlterClause> alterClauses) {
return alterClauses.stream().filter(clause ->
clause instanceof ModifyTablePropertiesClause
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index e05130b103..5ab9f77a88 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -284,6 +284,10 @@ public class RollupJobV2 extends AlterJobV2 implements
GsonPostProcessable {
tbl.disableAutoCompaction(),
tbl.enableSingleReplicaCompaction(),
tbl.skipWriteIndexOnLoad(),
+ tbl.getCompactionPolicy(),
+ tbl.getTimeSeriesCompactionGoalSizeMbytes(),
+
tbl.getTimeSeriesCompactionFileCountThreshold(),
+
tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
tbl.storeRowColumn(),
tbl.isDynamicSchema(),
binlogConfig);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index ff2fcc3fd6..df7fdc81c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -2137,13 +2137,43 @@ public class SchemaChangeHandler extends AlterHandler {
}
long storagePolicyId = storagePolicyNameToId(storagePolicy);
- if (isInMemory < 0 && storagePolicyId < 0) {
- LOG.info("Properties already up-to-date");
- return;
+ String compactionPolicy =
properties.get(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY);
+ if (compactionPolicy != null
+ &&
!compactionPolicy.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)
+ &&
!compactionPolicy.equals(PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY)) {
+ throw new UserException("Table compaction policy only support for "
+ +
PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY
+ + " or " +
PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY);
+ }
+
+ Map<String, Long> timeSeriesCompactionConfig = new HashMap<>();
+ if
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES))
{
+ timeSeriesCompactionConfig
+
.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES,
+ Long.parseLong(properties
+
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)));
+ }
+ if
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD))
{
+ timeSeriesCompactionConfig
+
.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD,
+ Long.parseLong(properties
+
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD)));
+ }
+ if
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS))
{
+ timeSeriesCompactionConfig
+
.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS,
+ Long.parseLong(properties
+
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)));
}
for (Partition partition : partitions) {
- updatePartitionProperties(db, olapTable.getName(),
partition.getName(), storagePolicyId, isInMemory, null);
+ updatePartitionProperties(db, olapTable.getName(),
partition.getName(), storagePolicyId, isInMemory,
+ null, compactionPolicy,
timeSeriesCompactionConfig);
+ }
+
+ if (isInMemory < 0 && storagePolicyId < 0 && compactionPolicy == null
&& timeSeriesCompactionConfig.isEmpty()) {
+ LOG.info("Properties already up-to-date");
+ return;
}
olapTable.writeLockOrDdlException();
@@ -2274,6 +2304,87 @@ public class SchemaChangeHandler extends AlterHandler {
}
}
+ /**
+ * Update one specified partition's properties by partition name of table
+ * This operation may return partial successfully, with an exception to
inform user to retry
+ */
+ public void updatePartitionProperties(Database db, String tableName,
String partitionName, long storagePolicyId,
+ int isInMemory, BinlogConfig
binlogConfig, String compactionPolicy,
+ Map<String, Long>
timeSeriesCompactionConfig) throws UserException {
+ // be id -> <tablet id,schemaHash>
+ Map<Long, Set<Pair<Long, Integer>>> beIdToTabletIdWithHash =
Maps.newHashMap();
+ OlapTable olapTable = (OlapTable)
db.getTableOrMetaException(tableName, Table.TableType.OLAP);
+ olapTable.readLock();
+ try {
+ Partition partition = olapTable.getPartition(partitionName);
+ if (partition == null) {
+ throw new DdlException(
+ "Partition[" + partitionName + "] does not exist in
table[" + olapTable.getName() + "]");
+ }
+
+ for (MaterializedIndex index :
partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+ int schemaHash =
olapTable.getSchemaHashByIndexId(index.getId());
+ for (Tablet tablet : index.getTablets()) {
+ for (Replica replica : tablet.getReplicas()) {
+ Set<Pair<Long, Integer>> tabletIdWithHash =
beIdToTabletIdWithHash.computeIfAbsent(
+ replica.getBackendId(), k ->
Sets.newHashSet());
+ tabletIdWithHash.add(Pair.of(tablet.getId(),
schemaHash));
+ }
+ }
+ }
+ } finally {
+ olapTable.readUnlock();
+ }
+
+ int totalTaskNum = beIdToTabletIdWithHash.keySet().size();
+ MarkedCountDownLatch<Long, Set<Pair<Long, Integer>>> countDownLatch =
new MarkedCountDownLatch<>(totalTaskNum);
+ AgentBatchTask batchTask = new AgentBatchTask();
+ for (Map.Entry<Long, Set<Pair<Long, Integer>>> kv :
beIdToTabletIdWithHash.entrySet()) {
+ countDownLatch.addMark(kv.getKey(), kv.getValue());
+ UpdateTabletMetaInfoTask task = new
UpdateTabletMetaInfoTask(kv.getKey(), kv.getValue(), isInMemory,
+ storagePolicyId, binlogConfig, countDownLatch,
compactionPolicy, timeSeriesCompactionConfig);
+ batchTask.addTask(task);
+ }
+ if (!FeConstants.runningUnitTest) {
+ // send all tasks and wait them finished
+ AgentTaskQueue.addBatchTask(batchTask);
+ AgentTaskExecutor.submit(batchTask);
+ LOG.info("send update tablet meta task for table {}, partitions
{}, number: {}", tableName, partitionName,
+ batchTask.getTaskNum());
+
+ // estimate timeout
+ long timeout = Config.tablet_create_timeout_second * 1000L *
totalTaskNum;
+ timeout = Math.min(timeout, Config.max_create_table_timeout_second
* 1000);
+ boolean ok = false;
+ try {
+ ok = countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ LOG.warn("InterruptedException: ", e);
+ }
+
+ if (!ok || !countDownLatch.getStatus().ok()) {
+ String errMsg = "Failed to update partition[" + partitionName
+ "]. tablet meta.";
+ // clear tasks
+ AgentTaskQueue.removeBatchTask(batchTask,
TTaskType.UPDATE_TABLET_META_INFO);
+
+ if (!countDownLatch.getStatus().ok()) {
+ errMsg += " Error: " +
countDownLatch.getStatus().getErrorMsg();
+ } else {
+ List<Map.Entry<Long, Set<Pair<Long, Integer>>>>
unfinishedMarks = countDownLatch.getLeftMarks();
+ // only show at most 3 results
+ List<Map.Entry<Long, Set<Pair<Long, Integer>>>> subList =
unfinishedMarks.subList(0,
+ Math.min(unfinishedMarks.size(), 3));
+ if (!subList.isEmpty()) {
+ errMsg += " Unfinished mark: " + Joiner.on(",
").join(subList);
+ }
+ }
+ errMsg += ". This operation maybe partial successfully, You
should retry until success.";
+ LOG.warn(errMsg);
+ throw new DdlException(errMsg);
+ }
+ }
+ }
+
@Override
public void cancel(CancelStmt stmt) throws DdlException {
CancelAlterTableStmt cancelAlterTableStmt = (CancelAlterTableStmt)
stmt;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index cc10efa951..2be16f6119 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -279,6 +279,10 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
tbl.disableAutoCompaction(),
tbl.enableSingleReplicaCompaction(),
tbl.skipWriteIndexOnLoad(),
+ tbl.getCompactionPolicy(),
+
tbl.getTimeSeriesCompactionGoalSizeMbytes(),
+
tbl.getTimeSeriesCompactionFileCountThreshold(),
+
tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
tbl.storeRowColumn(),
tbl.isDynamicSchema(),
binlogConfig);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
index 93ece524f2..1ef31b3734 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
@@ -54,6 +54,46 @@ public class ModifyTablePropertiesClause extends
AlterTableClause {
return isBeingSynced;
}
+ private String compactionPolicy;
+
+ private long timeSeriesCompactionGoalSizeMbytes;
+
+ private long timeSeriesCompactionFileCountThreshold;
+
+ private long timeSeriesCompactionTimeThresholdSeconds;
+
+ public void setCompactionPolicy(String compactionPolicy) {
+ this.compactionPolicy = compactionPolicy;
+ }
+
+ public String compactionPolicy() {
+ return compactionPolicy;
+ }
+
+ public void setTimeSeriesCompactionGoalSizeMbytes(long
timeSeriesCompactionGoalSizeMbytes) {
+ this.timeSeriesCompactionGoalSizeMbytes =
timeSeriesCompactionGoalSizeMbytes;
+ }
+
+ public long timeSeriesCompactionGoalSizeMbytes() {
+ return timeSeriesCompactionGoalSizeMbytes;
+ }
+
+ public void setTimeSeriesCompactionFileCountThreshold(long
timeSeriesCompactionFileCountThreshold) {
+ this.timeSeriesCompactionFileCountThreshold =
timeSeriesCompactionFileCountThreshold;
+ }
+
+ public Long timeSeriesCompactionFileCountThreshold() {
+ return timeSeriesCompactionFileCountThreshold;
+ }
+
+ public void setTimeSeriesCompactionTimeThresholdSeconds(long
timeSeriesCompactionTimeThresholdSeconds) {
+ this.timeSeriesCompactionTimeThresholdSeconds =
timeSeriesCompactionTimeThresholdSeconds;
+ }
+
+ public Long timeSeriesCompactionTimeThresholdSeconds() {
+ return timeSeriesCompactionTimeThresholdSeconds;
+ }
+
public ModifyTablePropertiesClause(Map<String, String> properties) {
super(AlterOpType.MODIFY_TABLE_PROPERTY);
this.properties = properties;
@@ -142,6 +182,65 @@ public class ModifyTablePropertiesClause extends
AlterTableClause {
||
properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_BYTES)
||
properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_HISTORY_NUMS)) {
// do nothing, will be alter in
SchemaChangeHandler.updateBinlogConfig
+ } else if
(properties.containsKey(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY)) {
+ String compactionPolicy =
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY, "");
+ if (compactionPolicy != null
+ &&
!compactionPolicy.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)
+ &&
!compactionPolicy.equals(PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY)) {
+ throw new AnalysisException(
+ "Table compaction policy only support for " +
PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY
+ + " or " +
PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY);
+ }
+ this.needTableStable = false;
+ setCompactionPolicy(compactionPolicy);
+ } else if
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES))
{
+ long goalSizeMbytes;
+ String goalSizeMbytesStr = properties
+
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES);
+ try {
+ goalSizeMbytes = Long.parseLong(goalSizeMbytesStr);
+ if (goalSizeMbytes < 10) {
+ throw new
AnalysisException("time_series_compaction_goal_size_mbytes can not be less than
10:"
+ + goalSizeMbytesStr);
+ }
+ } catch (NumberFormatException e) {
+ throw new AnalysisException("Invalid
time_series_compaction_goal_size_mbytes format: "
+ + goalSizeMbytesStr);
+ }
+ this.needTableStable = false;
+ setTimeSeriesCompactionGoalSizeMbytes(goalSizeMbytes);
+ } else if
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD))
{
+ long fileCountThreshold;
+ String fileCountThresholdStr = properties
+
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD);
+ try {
+ fileCountThreshold = Long.parseLong(fileCountThresholdStr);
+ if (fileCountThreshold < 10) {
+ throw new
AnalysisException("time_series_compaction_file_count_threshold can not be less
than 10:"
+
+ fileCountThresholdStr);
+ }
+ } catch (NumberFormatException e) {
+ throw new AnalysisException("Invalid
time_series_compaction_file_count_threshold format: "
+
+ fileCountThresholdStr);
+ }
+ this.needTableStable = false;
+ setTimeSeriesCompactionFileCountThreshold(fileCountThreshold);
+ } else if
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS))
{
+ long timeThresholdSeconds;
+ String timeThresholdSecondsStr = properties
+
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS);
+ try {
+ timeThresholdSeconds = Long.parseLong(timeThresholdSecondsStr);
+ if (timeThresholdSeconds < 60) {
+ throw new
AnalysisException("time_series_compaction_time_threshold_seconds can not be
less than 60:"
+
+ timeThresholdSecondsStr);
+ }
+ } catch (NumberFormatException e) {
+ throw new AnalysisException("Invalid
time_series_compaction_time_threshold_seconds format: "
+
+ timeThresholdSecondsStr);
+ }
+ this.needTableStable = false;
+ setTimeSeriesCompactionTimeThresholdSeconds(timeThresholdSeconds);
} else {
throw new AnalysisException("Unknown table property: " +
properties.keySet());
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index a5256dac64..e1c0e4a29e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -1043,6 +1043,10 @@ public class RestoreJob extends AbstractJob {
localTbl.disableAutoCompaction(),
localTbl.enableSingleReplicaCompaction(),
localTbl.skipWriteIndexOnLoad(),
+ localTbl.getCompactionPolicy(),
+ localTbl.getTimeSeriesCompactionGoalSizeMbytes(),
+
localTbl.getTimeSeriesCompactionFileCountThreshold(),
+
localTbl.getTimeSeriesCompactionTimeThresholdSeconds(),
localTbl.storeRowColumn(),
localTbl.isDynamicSchema(),
binlogConfig);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 7903ebac44..30fe9ab59c 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -3136,6 +3136,37 @@ public class Env {
sb.append(olapTable.skipWriteIndexOnLoad()).append("\"");
}
+ // compaction policy
+ if (olapTable.getCompactionPolicy() != null &&
!olapTable.getCompactionPolicy().equals("")
+ &&
!olapTable.getCompactionPolicy().equals(PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY))
{
+
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY).append("\"
= \"");
+ sb.append(olapTable.getCompactionPolicy()).append("\"");
+ }
+
+ // time series compaction goal size
+ if (olapTable.getCompactionPolicy() != null &&
olapTable.getCompactionPolicy()
+
.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)) {
+ sb.append(",\n\"").append(PropertyAnalyzer
+
.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES).append("\" = \"");
+
sb.append(olapTable.getTimeSeriesCompactionGoalSizeMbytes()).append("\"");
+ }
+
+ // time series compaction file count threshold
+ if (olapTable.getCompactionPolicy() != null &&
olapTable.getCompactionPolicy()
+
.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)) {
+ sb.append(",\n\"").append(PropertyAnalyzer
+
.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD).append("\" = \"");
+
sb.append(olapTable.getTimeSeriesCompactionFileCountThreshold()).append("\"");
+ }
+
+ // time series compaction time threshold
+ if (olapTable.getCompactionPolicy() != null &&
olapTable.getCompactionPolicy()
+
.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)) {
+ sb.append(",\n\"").append(PropertyAnalyzer
+
.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS).append("\" = \"");
+
sb.append(olapTable.getTimeSeriesCompactionTimeThresholdSeconds()).append("\"");
+ }
+
// dynamic schema
if (olapTable.isDynamicSchema()) {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_DYNAMIC_SCHEMA).append("\"
= \"");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 73baf96656..03df41cbf6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -1868,6 +1868,62 @@ public class OlapTable extends Table {
return false;
}
+ public void setCompactionPolicy(String compactionPolicy) {
+ TableProperty tableProperty = getOrCreatTableProperty();
+
tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY,
compactionPolicy);
+ tableProperty.buildCompactionPolicy();
+ }
+
+ public String getCompactionPolicy() {
+ if (tableProperty != null) {
+ return tableProperty.compactionPolicy();
+ }
+ return "";
+ }
+
+ public void setTimeSeriesCompactionGoalSizeMbytes(long
timeSeriesCompactionGoalSizeMbytes) {
+ TableProperty tableProperty = getOrCreatTableProperty();
+
tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES,
+
Long.valueOf(timeSeriesCompactionGoalSizeMbytes).toString());
+ tableProperty.buildTimeSeriesCompactionGoalSizeMbytes();
+ }
+
+ public Long getTimeSeriesCompactionGoalSizeMbytes() {
+ if (tableProperty != null) {
+ return tableProperty.timeSeriesCompactionGoalSizeMbytes();
+ }
+ return null;
+ }
+
+ public void setTimeSeriesCompactionFileCountThreshold(long
timeSeriesCompactionFileCountThreshold) {
+ TableProperty tableProperty = getOrCreatTableProperty();
+
tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD,
+
Long.valueOf(timeSeriesCompactionFileCountThreshold).toString());
+ tableProperty.buildTimeSeriesCompactionFileCountThreshold();
+ }
+
+ public Long getTimeSeriesCompactionFileCountThreshold() {
+ if (tableProperty != null) {
+ return tableProperty.timeSeriesCompactionFileCountThreshold();
+ }
+ return null;
+ }
+
+ public void setTimeSeriesCompactionTimeThresholdSeconds(long
timeSeriesCompactionTimeThresholdSeconds) {
+ TableProperty tableProperty = getOrCreatTableProperty();
+ tableProperty.modifyTableProperties(PropertyAnalyzer
+
.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS,
+
Long.valueOf(timeSeriesCompactionTimeThresholdSeconds).toString());
+ tableProperty.buildTimeSeriesCompactionTimeThresholdSeconds();
+ }
+
+ public Long getTimeSeriesCompactionTimeThresholdSeconds() {
+ if (tableProperty != null) {
+ return tableProperty.timeSeriesCompactionTimeThresholdSeconds();
+ }
+ return null;
+ }
+
public Boolean isDynamicSchema() {
if (tableProperty != null) {
return tableProperty.isDynamicSchema();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
index bced77425b..c8b6cdb53e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
@@ -86,6 +86,17 @@ public class TableProperty implements Writable {
private boolean skipWriteIndexOnLoad = false;
+ private String compactionPolicy =
PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY;
+
+ private long timeSeriesCompactionGoalSizeMbytes
+ =
PropertyAnalyzer.TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE;
+
+ private long timeSeriesCompactionFileCountThreshold
+ =
PropertyAnalyzer.TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE;
+
+ private long timeSeriesCompactionTimeThresholdSeconds
+ =
PropertyAnalyzer.TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE;
+
private DataSortInfo dataSortInfo = new DataSortInfo();
public TableProperty(Map<String, String> properties) {
@@ -214,6 +225,51 @@ public class TableProperty implements Writable {
return skipWriteIndexOnLoad;
}
+ public TableProperty buildCompactionPolicy() {
+ compactionPolicy =
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY,
+
PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY);
+ return this;
+ }
+
+ public String compactionPolicy() {
+ return compactionPolicy;
+ }
+
+ public TableProperty buildTimeSeriesCompactionGoalSizeMbytes() {
+ timeSeriesCompactionGoalSizeMbytes = Long.parseLong(properties
+
.getOrDefault(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES,
+
String.valueOf(PropertyAnalyzer.TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE)));
+ return this;
+ }
+
+ public long timeSeriesCompactionGoalSizeMbytes() {
+ return timeSeriesCompactionGoalSizeMbytes;
+ }
+
+ public TableProperty buildTimeSeriesCompactionFileCountThreshold() {
+ timeSeriesCompactionFileCountThreshold = Long.parseLong(properties
+
.getOrDefault(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD,
+
String.valueOf(PropertyAnalyzer.TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE)));
+
+ return this;
+ }
+
+ public long timeSeriesCompactionFileCountThreshold() {
+ return timeSeriesCompactionFileCountThreshold;
+ }
+
+ public TableProperty buildTimeSeriesCompactionTimeThresholdSeconds() {
+ timeSeriesCompactionTimeThresholdSeconds = Long.parseLong(properties
+
.getOrDefault(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS,
+
String.valueOf(PropertyAnalyzer.TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE)));
+
+ return this;
+ }
+
+ public long timeSeriesCompactionTimeThresholdSeconds() {
+ return timeSeriesCompactionTimeThresholdSeconds;
+ }
+
public TableProperty buildStoragePolicy() {
storagePolicy =
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY, "");
return this;
@@ -445,6 +501,10 @@ public class TableProperty implements Writable {
.buildEnableLightSchemaChange()
.buildStoreRowColumn()
.buildSkipWriteIndexOnLoad()
+ .buildCompactionPolicy()
+ .buildTimeSeriesCompactionGoalSizeMbytes()
+ .buildTimeSeriesCompactionFileCountThreshold()
+ .buildTimeSeriesCompactionTimeThresholdSeconds()
.buildDisableAutoCompaction()
.buildEnableSingleReplicaCompaction();
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_105) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index 54a1d33df0..66254a52b8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -127,6 +127,16 @@ public class PropertyAnalyzer {
public static final String PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD =
"skip_write_index_on_load";
+ public static final String PROPERTIES_COMPACTION_POLICY =
"compaction_policy";
+
+ public static final String
PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES =
+
"time_series_compaction_goal_size_mbytes";
+
+ public static final String
PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD =
+
"time_series_compaction_file_count_threshold";
+
+ public static final String
PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS =
+
"time_series_compaction_time_threshold_seconds";
public static final String PROPERTIES_MUTABLE = "mutable";
public static final String PROPERTIES_IS_BEING_SYNCED = "is_being_synced";
@@ -152,6 +162,16 @@ public class PropertyAnalyzer {
private static final double MAX_FPP = 0.05;
private static final double MIN_FPP = 0.0001;
+ // compaction policy
+ public static final String SIZE_BASED_COMPACTION_POLICY = "size_based";
+ public static final String TIME_SERIES_COMPACTION_POLICY = "time_series";
+ public static final long
TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE = 1024;
+ public static final long
TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE = 2000;
+ public static final long
TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE = 3600;
+
+
+
+
/**
* check and replace members of DataProperty by properties.
*
@@ -595,6 +615,94 @@ public class PropertyAnalyzer {
+ " must be `true` or `false`");
}
+ public static String analyzeCompactionPolicy(Map<String, String>
properties) throws AnalysisException {
+ if (properties == null || properties.isEmpty()) {
+ return SIZE_BASED_COMPACTION_POLICY;
+ }
+ String compactionPolicy = SIZE_BASED_COMPACTION_POLICY;
+ if (properties.containsKey(PROPERTIES_COMPACTION_POLICY)) {
+ compactionPolicy = properties.get(PROPERTIES_COMPACTION_POLICY);
+ properties.remove(PROPERTIES_COMPACTION_POLICY);
+ if (compactionPolicy != null &&
!compactionPolicy.equals(TIME_SERIES_COMPACTION_POLICY)
+ &&
!compactionPolicy.equals(SIZE_BASED_COMPACTION_POLICY)) {
+ throw new AnalysisException(PROPERTIES_COMPACTION_POLICY
+ + " must be " + TIME_SERIES_COMPACTION_POLICY + " or "
+ SIZE_BASED_COMPACTION_POLICY);
+ }
+ }
+
+ return compactionPolicy;
+ }
+
+ public static long analyzeTimeSeriesCompactionGoalSizeMbytes(Map<String,
String> properties)
+
throws AnalysisException {
+ long goalSizeMbytes =
TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE;
+ if (properties == null || properties.isEmpty()) {
+ return goalSizeMbytes;
+ }
+ if
(properties.containsKey(PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)) {
+ String goalSizeMbytesStr =
properties.get(PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES);
+
properties.remove(PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES);
+ try {
+ goalSizeMbytes = Long.parseLong(goalSizeMbytesStr);
+ if (goalSizeMbytes < 10) {
+ throw new
AnalysisException("time_series_compaction_goal_size_mbytes can not be"
+ + " less than
10: " + goalSizeMbytesStr);
+ }
+ } catch (NumberFormatException e) {
+ throw new AnalysisException("Invalid
time_series_compaction_goal_size_mbytes format: "
+ + goalSizeMbytesStr);
+ }
+ }
+ return goalSizeMbytes;
+ }
+
+ public static long
analyzeTimeSeriesCompactionFileCountThreshold(Map<String, String> properties)
+
throws AnalysisException {
+ long fileCountThreshold =
TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE;
+ if (properties == null || properties.isEmpty()) {
+ return fileCountThreshold;
+ }
+ if
(properties.containsKey(PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD))
{
+ String fileCountThresholdStr = properties
+
.get(PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD);
+
properties.remove(PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD);
+ try {
+ fileCountThreshold = Long.parseLong(fileCountThresholdStr);
+ if (fileCountThreshold < 10) {
+ throw new
AnalysisException("time_series_compaction_file_count_threshold can not be "
+ + "less than 10: "
+ fileCountThresholdStr);
+ }
+ } catch (NumberFormatException e) {
+ throw new AnalysisException("Invalid
time_series_compaction_file_count_threshold format: "
+
+ fileCountThresholdStr);
+ }
+ }
+ return fileCountThreshold;
+ }
+
+ public static long
analyzeTimeSeriesCompactionTimeThresholdSeconds(Map<String, String> properties)
+
throws AnalysisException {
+ long timeThresholdSeconds =
TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE;
+ if (properties == null || properties.isEmpty()) {
+ return timeThresholdSeconds;
+ }
+ if
(properties.containsKey(PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS))
{
+ String timeThresholdSecondsStr =
properties.get(PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS);
+
properties.remove(PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS);
+ try {
+ timeThresholdSeconds = Long.parseLong(timeThresholdSecondsStr);
+ if (timeThresholdSeconds < 60) {
+ throw new
AnalysisException("time_series_compaction_time_threshold_seconds can not be"
+ + " less than
60: " + timeThresholdSecondsStr);
+ }
+ } catch (NumberFormatException e) {
+ throw new AnalysisException("Invalid
time_series_compaction_time_threshold_seconds format: "
+
+ timeThresholdSecondsStr);
+ }
+ }
+ return timeThresholdSeconds;
+ }
+
// analyzeCompressionType will parse the compression type from properties
public static TCompressionType analyzeCompressionType(Map<String, String>
properties) throws AnalysisException {
String compressionType = "";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index c4166870ad..4a617ebfb3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -1435,6 +1435,21 @@ public class InternalCatalog implements
CatalogIf<Database> {
properties.put(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD,
olapTable.skipWriteIndexOnLoad().toString());
}
+ if
(!properties.containsKey(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY)) {
+ properties.put(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY,
olapTable.getCompactionPolicy());
+ }
+ if
(!properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES))
{
+
properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES,
+
olapTable.getTimeSeriesCompactionGoalSizeMbytes().toString());
+ }
+ if
(!properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD))
{
+
properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD,
+
olapTable.getTimeSeriesCompactionFileCountThreshold().toString());
+ }
+ if
(!properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS))
{
+
properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS,
+
olapTable.getTimeSeriesCompactionTimeThresholdSeconds().toString());
+ }
if
(!properties.containsKey(PropertyAnalyzer.PROPERTIES_DYNAMIC_SCHEMA)) {
properties.put(PropertyAnalyzer.PROPERTIES_DYNAMIC_SCHEMA,
olapTable.isDynamicSchema().toString());
@@ -1531,7 +1546,11 @@ public class InternalCatalog implements
CatalogIf<Database> {
singlePartitionDesc.getTabletType(),
olapTable.getCompressionType(), olapTable.getDataSortInfo(),
olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy,
idGeneratorBuffer,
olapTable.disableAutoCompaction(),
olapTable.enableSingleReplicaCompaction(),
- olapTable.skipWriteIndexOnLoad(),
olapTable.storeRowColumn(), olapTable.isDynamicSchema(),
+ olapTable.skipWriteIndexOnLoad(),
olapTable.getCompactionPolicy(),
+ olapTable.getTimeSeriesCompactionGoalSizeMbytes(),
+ olapTable.getTimeSeriesCompactionFileCountThreshold(),
+ olapTable.getTimeSeriesCompactionTimeThresholdSeconds(),
+ olapTable.storeRowColumn(), olapTable.isDynamicSchema(),
binlogConfig, dataProperty.isStorageMediumSpecified());
// check again
@@ -1759,6 +1778,8 @@ public class InternalCatalog implements
CatalogIf<Database> {
DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite,
String storagePolicy,
IdGeneratorBuffer idGeneratorBuffer, boolean disableAutoCompaction,
boolean enableSingleReplicaCompaction, boolean
skipWriteIndexOnLoad,
+ String compactionPolicy, Long timeSeriesCompactionGoalSizeMbytes,
+ Long timeSeriesCompactionFileCountThreshold, Long
timeSeriesCompactionTimeThresholdSeconds,
boolean storeRowColumn, boolean isDynamicSchema, BinlogConfig
binlogConfig,
boolean isStorageMediumSpecified) throws DdlException {
// create base index first.
@@ -1823,6 +1844,8 @@ public class InternalCatalog implements
CatalogIf<Database> {
storageMedium, schema, bfColumns, bfFpp,
countDownLatch, indexes, isInMemory, tabletType,
dataSortInfo, compressionType,
enableUniqueKeyMergeOnWrite, storagePolicy,
disableAutoCompaction,
enableSingleReplicaCompaction, skipWriteIndexOnLoad,
+ compactionPolicy,
timeSeriesCompactionGoalSizeMbytes,
+ timeSeriesCompactionFileCountThreshold,
timeSeriesCompactionTimeThresholdSeconds,
storeRowColumn, isDynamicSchema, binlogConfig);
task.setStorageFormat(storageFormat);
@@ -1996,6 +2019,56 @@ public class InternalCatalog implements
CatalogIf<Database> {
// use light schema change optimization
olapTable.setDisableAutoCompaction(disableAutoCompaction);
+ // set compaction policy
+ String compactionPolicy =
PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY;
+ try {
+ compactionPolicy =
PropertyAnalyzer.analyzeCompactionPolicy(properties);
+ } catch (AnalysisException e) {
+ throw new DdlException(e.getMessage());
+ }
+ olapTable.setCompactionPolicy(compactionPolicy);
+
+ if
(!compactionPolicy.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)
+ &&
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)
+ ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD)
+ || properties
+
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)))
{
+ throw new DdlException("only time series compaction policy support
for time series config");
+ }
+
+ // set time series compaction goal size
+ long timeSeriesCompactionGoalSizeMbytes
+ =
PropertyAnalyzer.TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE;
+ try {
+ timeSeriesCompactionGoalSizeMbytes = PropertyAnalyzer
+
.analyzeTimeSeriesCompactionGoalSizeMbytes(properties);
+ } catch (AnalysisException e) {
+ throw new DdlException(e.getMessage());
+ }
+
olapTable.setTimeSeriesCompactionGoalSizeMbytes(timeSeriesCompactionGoalSizeMbytes);
+
+ // set time series compaction file count threshold
+ long timeSeriesCompactionFileCountThreshold
+ =
PropertyAnalyzer.TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE;
+ try {
+ timeSeriesCompactionFileCountThreshold = PropertyAnalyzer
+
.analyzeTimeSeriesCompactionFileCountThreshold(properties);
+ } catch (AnalysisException e) {
+ throw new DdlException(e.getMessage());
+ }
+
olapTable.setTimeSeriesCompactionFileCountThreshold(timeSeriesCompactionFileCountThreshold);
+
+ // set time series compaction time threshold
+ long timeSeriesCompactionTimeThresholdSeconds
+ =
PropertyAnalyzer.TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE;
+ try {
+ timeSeriesCompactionTimeThresholdSeconds = PropertyAnalyzer
+
.analyzeTimeSeriesCompactionTimeThresholdSeconds(properties);
+ } catch (AnalysisException e) {
+ throw new DdlException(e.getMessage());
+ }
+
olapTable.setTimeSeriesCompactionTimeThresholdSeconds(timeSeriesCompactionTimeThresholdSeconds);
+
// get storage format
TStorageFormat storageFormat = TStorageFormat.V2; // default is
segment v2
try {
@@ -2322,6 +2395,9 @@ public class InternalCatalog implements
CatalogIf<Database> {
olapTable.getDataSortInfo(),
olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy,
idGeneratorBuffer, olapTable.disableAutoCompaction(),
olapTable.enableSingleReplicaCompaction(),
skipWriteIndexOnLoad,
+ olapTable.getCompactionPolicy(),
olapTable.getTimeSeriesCompactionGoalSizeMbytes(),
+ olapTable.getTimeSeriesCompactionFileCountThreshold(),
+
olapTable.getTimeSeriesCompactionTimeThresholdSeconds(),
storeRowColumn, isDynamicSchema, binlogConfigForTask,
partitionInfo.getDataProperty(partitionId).isStorageMediumSpecified());
olapTable.addPartition(partition);
@@ -2388,8 +2464,12 @@ public class InternalCatalog implements
CatalogIf<Database> {
storageFormat,
partitionInfo.getTabletType(entry.getValue()), compressionType,
olapTable.getDataSortInfo(),
olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy,
idGeneratorBuffer,
olapTable.disableAutoCompaction(),
- olapTable.enableSingleReplicaCompaction(),
skipWriteIndexOnLoad, storeRowColumn,
- isDynamicSchema, binlogConfigForTask,
dataProperty.isStorageMediumSpecified());
+ olapTable.enableSingleReplicaCompaction(),
skipWriteIndexOnLoad,
+ olapTable.getCompactionPolicy(),
olapTable.getTimeSeriesCompactionGoalSizeMbytes(),
+
olapTable.getTimeSeriesCompactionFileCountThreshold(),
+
olapTable.getTimeSeriesCompactionTimeThresholdSeconds(),
+ storeRowColumn, isDynamicSchema,
binlogConfigForTask,
+ dataProperty.isStorageMediumSpecified());
olapTable.addPartition(partition);
}
} else {
@@ -2812,6 +2892,9 @@ public class InternalCatalog implements
CatalogIf<Database> {
olapTable.getPartitionInfo().getDataProperty(oldPartitionId).getStoragePolicy(),
idGeneratorBuffer, olapTable.disableAutoCompaction(),
olapTable.enableSingleReplicaCompaction(),
olapTable.skipWriteIndexOnLoad(),
+ olapTable.getCompactionPolicy(),
olapTable.getTimeSeriesCompactionGoalSizeMbytes(),
+ olapTable.getTimeSeriesCompactionFileCountThreshold(),
+
olapTable.getTimeSeriesCompactionTimeThresholdSeconds(),
olapTable.storeRowColumn(),
olapTable.isDynamicSchema(), binlogConfig,
copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId).isStorageMediumSpecified());
newPartitions.add(newPartition);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 17be05444e..055652a0b9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -809,7 +809,10 @@ public class ReportHandler extends Daemon {
olapTable.getEnableUniqueKeyMergeOnWrite(), olapTable.getStoragePolicy(),
olapTable.disableAutoCompaction(),
olapTable.enableSingleReplicaCompaction(),
- olapTable.skipWriteIndexOnLoad(),
+ olapTable.skipWriteIndexOnLoad(),
olapTable.getCompactionPolicy(),
+
olapTable.getTimeSeriesCompactionGoalSizeMbytes(),
+
olapTable.getTimeSeriesCompactionFileCountThreshold(),
+
olapTable.getTimeSeriesCompactionTimeThresholdSeconds(),
olapTable.storeRowColumn(),
olapTable.isDynamicSchema(),
binlogConfig);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
index 49bd33fb5c..f654c16784 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
@@ -105,6 +105,14 @@ public class CreateReplicaTask extends AgentTask {
private boolean skipWriteIndexOnLoad;
+ private String compactionPolicy;
+
+ private long timeSeriesCompactionGoalSizeMbytes;
+
+ private long timeSeriesCompactionFileCountThreshold;
+
+ private long timeSeriesCompactionTimeThresholdSeconds;
+
private boolean storeRowColumn;
private BinlogConfig binlogConfig;
@@ -123,6 +131,10 @@ public class CreateReplicaTask extends AgentTask {
String storagePolicy, boolean
disableAutoCompaction,
boolean enableSingleReplicaCompaction,
boolean skipWriteIndexOnLoad,
+ String compactionPolicy,
+ long timeSeriesCompactionGoalSizeMbytes,
+ long timeSeriesCompactionFileCountThreshold,
+ long timeSeriesCompactionTimeThresholdSeconds,
boolean storeRowColumn,
boolean isDynamicSchema,
BinlogConfig binlogConfig) {
@@ -162,6 +174,10 @@ public class CreateReplicaTask extends AgentTask {
this.disableAutoCompaction = disableAutoCompaction;
this.enableSingleReplicaCompaction = enableSingleReplicaCompaction;
this.skipWriteIndexOnLoad = skipWriteIndexOnLoad;
+ this.compactionPolicy = compactionPolicy;
+ this.timeSeriesCompactionGoalSizeMbytes =
timeSeriesCompactionGoalSizeMbytes;
+ this.timeSeriesCompactionFileCountThreshold =
timeSeriesCompactionFileCountThreshold;
+ this.timeSeriesCompactionTimeThresholdSeconds =
timeSeriesCompactionTimeThresholdSeconds;
this.storeRowColumn = storeRowColumn;
this.binlogConfig = binlogConfig;
}
@@ -270,7 +286,6 @@ public class CreateReplicaTask extends AgentTask {
tSchema.setDisableAutoCompaction(disableAutoCompaction);
tSchema.setEnableSingleReplicaCompaction(enableSingleReplicaCompaction);
tSchema.setSkipWriteIndexOnLoad(skipWriteIndexOnLoad);
- tSchema.setSkipWriteIndexOnLoad(skipWriteIndexOnLoad);
tSchema.setStoreRowColumn(storeRowColumn);
tSchema.setIsDynamicSchema(isDynamicSchema);
createTabletReq.setTabletSchema(tSchema);
@@ -300,6 +315,10 @@ public class CreateReplicaTask extends AgentTask {
createTabletReq.setTabletType(tabletType);
createTabletReq.setCompressionType(compressionType);
createTabletReq.setEnableUniqueKeyMergeOnWrite(enableUniqueKeyMergeOnWrite);
+ createTabletReq.setCompactionPolicy(compactionPolicy);
+
createTabletReq.setTimeSeriesCompactionGoalSizeMbytes(timeSeriesCompactionGoalSizeMbytes);
+
createTabletReq.setTimeSeriesCompactionFileCountThreshold(timeSeriesCompactionFileCountThreshold);
+
createTabletReq.setTimeSeriesCompactionTimeThresholdSeconds(timeSeriesCompactionTimeThresholdSeconds);
if (binlogConfig != null) {
createTabletReq.setBinlogConfig(binlogConfig.toThrift());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java
index 29ad3ce199..6b12c34e0d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.Pair;
import org.apache.doris.common.Status;
+import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTabletMetaInfo;
import org.apache.doris.thrift.TTaskType;
@@ -30,6 +31,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.Set;
@@ -44,6 +46,8 @@ public class UpdateTabletMetaInfoTask extends AgentTask {
private int inMemory = -1; // < 0 means not to update inMemory property, >
0 means true, == 0 means false
private long storagePolicyId = -1; // < 0 means not to update storage
policy, == 0 means to reset storage policy
private BinlogConfig binlogConfig = null; // null means not to update
binlog config
+ private String compactionPolicy = null; // null means not to update
compaction policy
+ private Map<String, Long> timeSeriesCompactionConfig = null; // null means
not to update compaction policy config
// For ReportHandler
private List<TTabletMetaInfo> tabletMetaInfos;
@@ -72,6 +76,22 @@ public class UpdateTabletMetaInfoTask extends AgentTask {
this.tabletMetaInfos = tabletMetaInfos;
}
+ public UpdateTabletMetaInfoTask(long backendId,
+ Set<Pair<Long, Integer>>
tableIdWithSchemaHash,
+ int inMemory, long storagePolicyId,
+ BinlogConfig binlogConfig,
+ MarkedCountDownLatch<Long, Set<Pair<Long,
Integer>>> latch,
+ String compactionPolicy,
+ Map<String, Long>
timeSeriesCompactionConfig) {
+ this(backendId, tableIdWithSchemaHash);
+ this.storagePolicyId = storagePolicyId;
+ this.inMemory = inMemory;
+ this.binlogConfig = binlogConfig;
+ this.latch = latch;
+ this.compactionPolicy = compactionPolicy;
+ this.timeSeriesCompactionConfig = timeSeriesCompactionConfig;
+ }
+
public void countDownLatch(long backendId, Set<Pair<Long, Integer>>
tablets) {
if (this.latch != null) {
if (latch.markedCountDown(backendId, tablets)) {
@@ -110,6 +130,26 @@ public class UpdateTabletMetaInfoTask extends AgentTask {
if (binlogConfig != null) {
metaInfo.setBinlogConfig(binlogConfig.toThrift());
}
+ if (compactionPolicy != null) {
+ metaInfo.setCompactionPolicy(compactionPolicy);
+ }
+ if (timeSeriesCompactionConfig != null &&
!timeSeriesCompactionConfig.isEmpty()) {
+ if (timeSeriesCompactionConfig
+
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES))
{
+
metaInfo.setTimeSeriesCompactionGoalSizeMbytes(timeSeriesCompactionConfig
+
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES));
+ }
+ if (timeSeriesCompactionConfig
+
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD))
{
+
metaInfo.setTimeSeriesCompactionFileCountThreshold(timeSeriesCompactionConfig
+
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD));
+ }
+ if (timeSeriesCompactionConfig
+
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS))
{
+
metaInfo.setTimeSeriesCompactionTimeThresholdSeconds(timeSeriesCompactionConfig
+
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS));
+ }
+ }
updateTabletMetaInfoReq.addToTabletMetaInfos(metaInfo);
}
} else {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
index cf19d7d918..86d3fda79f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
@@ -107,7 +107,7 @@ public class AgentTaskTest {
createReplicaTask = new CreateReplicaTask(backendId1, dbId, tableId,
partitionId,
indexId1, tabletId1, replicaId1, shortKeyNum, schemaHash1,
version, KeysType.AGG_KEYS, storageType,
TStorageMedium.SSD, columns, null, 0, latch, null, false,
TTabletType.TABLET_TYPE_DISK, null,
- TCompressionType.LZ4F, false, "", false, false, false, false,
false, null);
+ TCompressionType.LZ4F, false, "", false, false, false, "", 0,
0, 0, false, false, null);
// drop
dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1,
schemaHash1, false);
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index fe894dba9d..7c0c83a4a0 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -311,6 +311,10 @@ message TabletMetaPB {
optional int64 storage_policy_id = 25;
optional PUniqueId cooldown_meta_id = 26;
optional BinlogConfigPB binlog_config = 27;
+ optional string compaction_policy = 28 [default = "size_based"];
+ optional int64 time_series_compaction_goal_size_mbytes = 29 [default =
1024];
+ optional int64 time_series_compaction_file_count_threshold = 30 [default =
2000];
+ optional int64 time_series_compaction_time_threshold_seconds = 31 [default
= 3600];
}
message OLAPRawDeltaHeaderMessage {
diff --git a/gensrc/thrift/AgentService.thrift
b/gensrc/thrift/AgentService.thrift
index c1d25d5e66..0fec8c6399 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -143,6 +143,10 @@ struct TCreateTabletReq {
19: optional bool enable_unique_key_merge_on_write = false
20: optional i64 storage_policy_id
21: optional TBinlogConfig binlog_config
+ 22: optional string compaction_policy = "size_based"
+ 23: optional i64 time_series_compaction_goal_size_mbytes = 1024
+ 24: optional i64 time_series_compaction_file_count_threshold = 2000
+ 25: optional i64 time_series_compaction_time_threshold_seconds = 3600
}
struct TDropTabletReq {
@@ -403,6 +407,10 @@ struct TTabletMetaInfo {
7: optional i64 storage_policy_id
8: optional Types.TReplicaId replica_id
9: optional TBinlogConfig binlog_config
+ 10: optional string compaction_policy
+ 11: optional i64 time_series_compaction_goal_size_mbytes
+ 12: optional i64 time_series_compaction_file_count_threshold
+ 13: optional i64 time_series_compaction_time_threshold_seconds
}
struct TUpdateTabletMetaInfoReq {
diff --git
a/regression-test/suites/compaction/test_table_level_compaction_policy.groovy
b/regression-test/suites/compaction/test_table_level_compaction_policy.groovy
new file mode 100644
index 0000000000..838d8b34dc
--- /dev/null
+++
b/regression-test/suites/compaction/test_table_level_compaction_policy.groovy
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_table_level_compaction_policy") {
+ def tableName = "test_table_level_compaction_policy"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+
+ sql """
+ CREATE TABLE ${tableName} (
+ `c_custkey` int(11) NOT NULL COMMENT "",
+ `c_name` varchar(26) NOT NULL COMMENT "",
+ `c_address` varchar(41) NOT NULL COMMENT "",
+ `c_city` varchar(11) NOT NULL COMMENT ""
+ )
+ DUPLICATE KEY (`c_custkey`)
+ DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "compaction_policy" = "time_series",
+ "time_series_compaction_goal_size_mbytes" = "2048",
+ "time_series_compaction_file_count_threshold" = "5000",
+ "time_series_compaction_time_threshold_seconds" = "86400"
+ );
+ """
+ result = sql """show create table ${tableName}"""
+ logger.info("${result}")
+ assertTrue(result.toString().containsIgnoreCase('"compaction_policy" =
"time_series"'))
+
assertTrue(result.toString().containsIgnoreCase('"time_series_compaction_goal_size_mbytes"
= "2048"'))
+
assertTrue(result.toString().containsIgnoreCase('"time_series_compaction_file_count_threshold"
= "5000"'))
+
assertTrue(result.toString().containsIgnoreCase('"time_series_compaction_time_threshold_seconds"
= "86400"'))
+
+ sql """
+ alter table ${tableName} set
("time_series_compaction_goal_size_mbytes" = "1024")
+ """
+
+ result = sql """show create table ${tableName}"""
+ logger.info("${result}")
+
assertTrue(result.toString().containsIgnoreCase('"time_series_compaction_goal_size_mbytes"
= "1024"'))
+
+ sql """
+ alter table ${tableName} set
("time_series_compaction_file_count_threshold" = "6000")
+ """
+
+ result = sql """show create table ${tableName}"""
+ logger.info("${result}")
+
assertTrue(result.toString().containsIgnoreCase('"time_series_compaction_file_count_threshold"
= "6000"'))
+
+ sql """
+ alter table ${tableName} set
("time_series_compaction_time_threshold_seconds" = "3000")
+ """
+
+ result = sql """show create table ${tableName}"""
+ logger.info("${result}")
+
assertTrue(result.toString().containsIgnoreCase('"time_series_compaction_time_threshold_seconds"
= "3000"'))
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+
+ sql """
+ CREATE TABLE ${tableName} (
+ `c_custkey` int(11) NOT NULL COMMENT "",
+ `c_name` varchar(26) NOT NULL COMMENT "",
+ `c_address` varchar(41) NOT NULL COMMENT "",
+ `c_city` varchar(11) NOT NULL COMMENT ""
+ )
+ DUPLICATE KEY (`c_custkey`)
+ DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+ result = sql """show create table ${tableName}"""
+ logger.info("${result}")
+ assertFalse(result.toString().containsIgnoreCase('"compaction_policy"'))
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+
+ test {
+ sql """
+ CREATE TABLE ${tableName} (
+ `c_custkey` int(11) NOT NULL COMMENT "",
+ `c_name` varchar(26) NOT NULL COMMENT "",
+ `c_address` varchar(41) NOT NULL COMMENT "",
+ `c_city` varchar(11) NOT NULL COMMENT ""
+ )
+ DUPLICATE KEY (`c_custkey`)
+ DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "compaction_policy" = "time_series",
+ "time_series_compaction_goal_size_mbytes" = "5"
+ );
+ """
+ exception "time_series_compaction_goal_size_mbytes can not be less
than 10: 5"
+ }
+
+ test {
+ sql """
+ CREATE TABLE ${tableName} (
+ `c_custkey` int(11) NOT NULL COMMENT "",
+ `c_name` varchar(26) NOT NULL COMMENT "",
+ `c_address` varchar(41) NOT NULL COMMENT "",
+ `c_city` varchar(11) NOT NULL COMMENT ""
+ )
+ DUPLICATE KEY (`c_custkey`)
+ DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "compaction_policy" = "time_series",
+ "time_series_compaction_file_count_threshold" = "5"
+ );
+ """
+ exception "time_series_compaction_file_count_threshold can not be less
than 10: 5"
+ }
+
+ test {
+ sql """
+ CREATE TABLE ${tableName} (
+ `c_custkey` int(11) NOT NULL COMMENT "",
+ `c_name` varchar(26) NOT NULL COMMENT "",
+ `c_address` varchar(41) NOT NULL COMMENT "",
+ `c_city` varchar(11) NOT NULL COMMENT ""
+ )
+ DUPLICATE KEY (`c_custkey`)
+ DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "compaction_policy" = "time_series",
+ "time_series_compaction_time_threshold_seconds" = "5"
+ );
+ """
+ exception "time_series_compaction_time_threshold_seconds can not be
less than 60: 5"
+ }
+
+ test {
+ sql """
+ CREATE TABLE ${tableName} (
+ `c_custkey` int(11) NOT NULL COMMENT "",
+ `c_name` varchar(26) NOT NULL COMMENT "",
+ `c_address` varchar(41) NOT NULL COMMENT "",
+ `c_city` varchar(11) NOT NULL COMMENT ""
+ )
+ DUPLICATE KEY (`c_custkey`)
+ DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "compaction_policy" = "ok"
+ );
+ """
+ exception "compaction_policy must be time_series or size_based"
+ }
+
+ test {
+ sql """
+ CREATE TABLE ${tableName} (
+ `c_custkey` int(11) NOT NULL COMMENT "",
+ `c_name` varchar(26) NOT NULL COMMENT "",
+ `c_address` varchar(41) NOT NULL COMMENT "",
+ `c_city` varchar(11) NOT NULL COMMENT ""
+ )
+ DUPLICATE KEY (`c_custkey`)
+ DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "time_series_compaction_goal_size_mbytes" = "2048"
+ );
+ """
+ exception "only time series compaction policy support for time series
config"
+ }
+
+ test {
+ sql """
+ CREATE TABLE ${tableName} (
+ `c_custkey` int(11) NOT NULL COMMENT "",
+ `c_name` varchar(26) NOT NULL COMMENT "",
+ `c_address` varchar(41) NOT NULL COMMENT "",
+ `c_city` varchar(11) NOT NULL COMMENT ""
+ )
+ DUPLICATE KEY (`c_custkey`)
+ DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ sql """
+ alter table ${tableName} set ("compaction_policy" = "ok")
+ """
+ exception "Table compaction policy only support for time_series or
size_based"
+ }
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]