This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 54282029a0fb170106c92784a7d5dd22be151c8a
Author: Chenyang Sun <[email protected]>
AuthorDate: Tue Aug 1 22:02:23 2023 +0800

    [improvement](compaction) compaction policy and options in the properties 
of a table (#22461)
---
 be/src/agent/task_worker_pool.cpp                  |  41 ++++
 be/src/common/config.cpp                           |  14 --
 be/src/common/config.h                             |  11 --
 be/src/http/action/compaction_action.cpp           |   4 +-
 be/src/olap/compaction.cpp                         |   6 +-
 be/src/olap/cumulative_compaction_policy.cpp       |   7 +-
 be/src/olap/cumulative_compaction_policy.h         |   5 +-
 .../cumulative_compaction_time_series_policy.cpp   |  27 +--
 .../cumulative_compaction_time_series_policy.h     |   8 +-
 be/src/olap/olap_server.cpp                        |  22 ++-
 be/src/olap/storage_engine.h                       |   4 +-
 be/src/olap/tablet.cpp                             |   5 +-
 be/src/olap/tablet_manager.cpp                     |   7 +-
 be/src/olap/tablet_manager.h                       |   3 +-
 be/src/olap/tablet_meta.cpp                        |  49 ++++-
 be/src/olap/tablet_meta.h                          |  35 +++-
 be/test/olap/cumulative_compaction_policy_test.cpp |   6 +-
 ...mulative_compaction_time_series_policy_test.cpp |  28 +--
 docs/en/docs/admin-manual/config/be-config.md      |  27 ---
 .../Create/CREATE-TABLE.md                         |  31 +++
 docs/zh-CN/docs/admin-manual/config/be-config.md   |  27 ---
 .../Create/CREATE-TABLE.md                         |  30 +++
 .../main/java/org/apache/doris/alter/Alter.java    |  30 ++-
 .../org/apache/doris/alter/AlterOperations.java    |  54 ++++++
 .../java/org/apache/doris/alter/RollupJobV2.java   |   4 +
 .../apache/doris/alter/SchemaChangeHandler.java    | 119 +++++++++++-
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |   4 +
 .../analysis/ModifyTablePropertiesClause.java      |  99 ++++++++++
 .../java/org/apache/doris/backup/RestoreJob.java   |   4 +
 .../main/java/org/apache/doris/catalog/Env.java    |  31 +++
 .../java/org/apache/doris/catalog/OlapTable.java   |  56 ++++++
 .../org/apache/doris/catalog/TableProperty.java    |  60 ++++++
 .../apache/doris/common/util/PropertyAnalyzer.java | 108 +++++++++++
 .../apache/doris/datasource/InternalCatalog.java   |  89 ++++++++-
 .../org/apache/doris/master/ReportHandler.java     |   5 +-
 .../org/apache/doris/task/CreateReplicaTask.java   |  21 ++-
 .../doris/task/UpdateTabletMetaInfoTask.java       |  40 ++++
 .../java/org/apache/doris/task/AgentTaskTest.java  |   2 +-
 gensrc/proto/olap_file.proto                       |   4 +
 gensrc/thrift/AgentService.thrift                  |   8 +
 .../test_table_level_compaction_policy.groovy      | 207 +++++++++++++++++++++
 41 files changed, 1198 insertions(+), 144 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 1c44f30548..8d90c8b1b0 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -433,6 +433,47 @@ void 
TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
                 
tablet->tablet_schema_unlocked()->set_is_in_memory(tablet_meta_info.is_in_memory);
                 need_to_save = true;
             }
+            if (tablet_meta_info.__isset.compaction_policy) {
+                if (tablet_meta_info.compaction_policy != "size_based" &&
+                    tablet_meta_info.compaction_policy != "time_series") {
+                    status = Status::InvalidArgument(
+                            "invalid compaction policy, only support for 
size_based or "
+                            "time_series");
+                    continue;
+                }
+                
tablet->tablet_meta()->set_compaction_policy(tablet_meta_info.compaction_policy);
+                need_to_save = true;
+            }
+            if 
(tablet_meta_info.__isset.time_series_compaction_goal_size_mbytes) {
+                if (tablet->tablet_meta()->compaction_policy() != 
"time_series") {
+                    status = Status::InvalidArgument(
+                            "only time series compaction policy support time 
series config");
+                    continue;
+                }
+                
tablet->tablet_meta()->set_time_series_compaction_goal_size_mbytes(
+                        
tablet_meta_info.time_series_compaction_goal_size_mbytes);
+                need_to_save = true;
+            }
+            if 
(tablet_meta_info.__isset.time_series_compaction_file_count_threshold) {
+                if (tablet->tablet_meta()->compaction_policy() != 
"time_series") {
+                    status = Status::InvalidArgument(
+                            "only time series compaction policy support time 
series config");
+                    continue;
+                }
+                
tablet->tablet_meta()->set_time_series_compaction_file_count_threshold(
+                        
tablet_meta_info.time_series_compaction_file_count_threshold);
+                need_to_save = true;
+            }
+            if 
(tablet_meta_info.__isset.time_series_compaction_time_threshold_seconds) {
+                if (tablet->tablet_meta()->compaction_policy() != 
"time_series") {
+                    status = Status::InvalidArgument(
+                            "only time series compaction policy support time 
series config");
+                    continue;
+                }
+                
tablet->tablet_meta()->set_time_series_compaction_time_threshold_seconds(
+                        
tablet_meta_info.time_series_compaction_time_threshold_seconds);
+                need_to_save = true;
+            }
             if (tablet_meta_info.__isset.replica_id) {
                 
tablet->tablet_meta()->set_replica_id(tablet_meta_info.replica_id);
             }
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 87d1a6ccc5..912fae1b5c 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -973,20 +973,6 @@ DEFINE_Int32(num_broadcast_buffer, "32");
 // semi-structure configs
 DEFINE_Bool(enable_parse_multi_dimession_array, "false");
 
-// Currently, two compaction strategies are implemented, SIZE_BASED and 
TIME_SERIES.
-// In the case of time series compaction, the execution of compaction is 
adjusted
-// using parameters that have the prefix time_series_compaction.
-DEFINE_mString(compaction_policy, "size_based");
-DEFINE_Validator(compaction_policy, [](const std::string config) -> bool {
-    return config == "size_based" || config == "time_series";
-});
-// the size of input files for each compaction
-DEFINE_mInt64(time_series_compaction_goal_size_mbytes, "512");
-// the minimum number of input files for each compaction if 
time_series_compaction_goal_size_mbytes not meets
-DEFINE_mInt64(time_series_compaction_file_count_threshold, "2000");
-// if compaction has not been performed within 3600 seconds, a compaction will 
be triggered
-DEFINE_mInt64(time_series_compaction_time_threshold_seconds, "3600");
-
 // max depth of expression tree allowed.
 DEFINE_Int32(max_depth_of_expr_tree, "600");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 0fd5c648d7..c38badffe7 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1005,17 +1005,6 @@ DECLARE_Int32(num_broadcast_buffer);
 // semi-structure configs
 DECLARE_Bool(enable_parse_multi_dimession_array);
 
-// Currently, two compaction strategies are implemented, SIZE_BASED and 
TIME_SERIES.
-// In the case of time series compaction, the execution of compaction is 
adjusted
-// using parameters that have the prefix time_series_compaction.
-DECLARE_mString(compaction_policy);
-// the size of input files for each compaction
-DECLARE_mInt64(time_series_compaction_goal_size_mbytes);
-// the minimum number of input files for each compaction if 
time_series_compaction_goal_size_mbytes not meets
-DECLARE_mInt64(time_series_compaction_file_count_threshold);
-// if compaction has not been performed within 3600 seconds, a compaction will 
be triggered
-DECLARE_mInt64(time_series_compaction_time_threshold_seconds);
-
 // max depth of expression tree allowed.
 DECLARE_Int32(max_depth_of_expr_tree);
 
diff --git a/be/src/http/action/compaction_action.cpp 
b/be/src/http/action/compaction_action.cpp
index 81f6ad4d30..d94ca60432 100644
--- a/be/src/http/action/compaction_action.cpp
+++ b/be/src/http/action/compaction_action.cpp
@@ -37,6 +37,7 @@
 #include "olap/base_compaction.h"
 #include "olap/cumulative_compaction.h"
 #include "olap/cumulative_compaction_policy.h"
+#include "olap/cumulative_compaction_time_series_policy.h"
 #include "olap/full_compaction.h"
 #include "olap/olap_define.h"
 #include "olap/storage_engine.h"
@@ -198,7 +199,8 @@ Status 
CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet,
     timer.start();
 
     std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy =
-            
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy();
+            
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
+                    tablet->tablet_meta()->compaction_policy());
     if (tablet->get_cumulative_compaction_policy() == nullptr) {
         tablet->set_cumulative_compaction_policy(cumulative_compaction_policy);
     }
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index d67248919f..2f24fbd071 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -148,8 +148,10 @@ int64_t Compaction::get_avg_segment_rows() {
     // input_rowsets_size is total disk_size of input_rowset, this size is the
     // final size after codec and compress, so expect dest segment file size
     // in disk is config::vertical_compaction_max_segment_size
-    if (config::compaction_policy == CUMULATIVE_TIME_SERIES_POLICY) {
-        return (config::time_series_compaction_goal_size_mbytes * 1024 * 1024 
* 2) /
+    const auto& meta = _tablet->tablet_meta();
+    if (meta->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY) {
+        int64_t compaction_goal_size_mbytes = 
meta->time_series_compaction_goal_size_mbytes();
+        return (compaction_goal_size_mbytes * 1024 * 1024 * 2) /
                (_input_rowsets_size / (_input_row_num + 1) + 1);
     }
     return config::vertical_compaction_max_segment_size /
diff --git a/be/src/olap/cumulative_compaction_policy.cpp 
b/be/src/olap/cumulative_compaction_policy.cpp
index 17773e831e..c3d5047cec 100644
--- a/be/src/olap/cumulative_compaction_policy.cpp
+++ b/be/src/olap/cumulative_compaction_policy.cpp
@@ -368,9 +368,12 @@ int64_t 
SizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size) {
 }
 
 std::shared_ptr<CumulativeCompactionPolicy>
-CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy() {
-    if (config::compaction_policy == CUMULATIVE_TIME_SERIES_POLICY) {
+CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
+        const std::string_view& compaction_policy) {
+    if (compaction_policy == CUMULATIVE_TIME_SERIES_POLICY) {
         return std::make_shared<TimeSeriesCumulativeCompactionPolicy>();
+    } else if (compaction_policy == CUMULATIVE_SIZE_BASED_POLICY) {
+        return std::make_shared<SizeBasedCumulativeCompactionPolicy>();
     }
     return std::make_shared<SizeBasedCumulativeCompactionPolicy>();
 }
diff --git a/be/src/olap/cumulative_compaction_policy.h 
b/be/src/olap/cumulative_compaction_policy.h
index 65f970e4af..836100903b 100644
--- a/be/src/olap/cumulative_compaction_policy.h
+++ b/be/src/olap/cumulative_compaction_policy.h
@@ -33,7 +33,7 @@ namespace doris {
 class Tablet;
 struct Version;
 
-inline std::string_view CUMULATIVE_SIZE_BASED_POLICY = "size_based";
+inline constexpr std::string_view CUMULATIVE_SIZE_BASED_POLICY = "size_based";
 
 /// This class CumulativeCompactionPolicy is the base class of cumulative 
compaction policy.
 /// It defines the policy to do cumulative compaction. It has different 
derived classes, which implements
@@ -176,7 +176,8 @@ class CumulativeCompactionPolicyFactory {
 public:
     /// Static factory function. It can product different policy according to 
the `policy` parameter and use tablet ptr
     /// to construct the policy. Now it can product size based and num based 
policies.
-    static std::shared_ptr<CumulativeCompactionPolicy> 
create_cumulative_compaction_policy();
+    static std::shared_ptr<CumulativeCompactionPolicy> 
create_cumulative_compaction_policy(
+            const std::string_view& compaction_policy);
 };
 
 } // namespace doris
diff --git a/be/src/olap/cumulative_compaction_time_series_policy.cpp 
b/be/src/olap/cumulative_compaction_time_series_policy.cpp
index ac816d0e54..1e144b6a2c 100644
--- a/be/src/olap/cumulative_compaction_time_series_policy.cpp
+++ b/be/src/olap/cumulative_compaction_time_series_policy.cpp
@@ -63,13 +63,15 @@ uint32_t 
TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
         return 0;
     }
 
-    // Condition 1: the size of input files for compaction meets the 
requirement of parameter _compaction_goal_size
-    if (total_size >= (config::time_series_compaction_goal_size_mbytes * 1024 
* 1024)) {
+    // Condition 1: the size of input files for compaction meets the 
requirement of parameter compaction_goal_size
+    int64_t compaction_goal_size_mbytes =
+            tablet->tablet_meta()->time_series_compaction_goal_size_mbytes();
+    if (total_size >= compaction_goal_size_mbytes * 1024 * 1024) {
         return score;
     }
 
-    // Condition 2: the number of input files reaches the threshold specified 
by parameter _compaction_file_count_threshold
-    if (score >= config::time_series_compaction_file_count_threshold) {
+    // Condition 2: the number of input files reaches the threshold specified 
by parameter compaction_file_count_threshold
+    if (score >= 
tablet->tablet_meta()->time_series_compaction_file_count_threshold()) {
         return score;
     }
 
@@ -79,7 +81,8 @@ uint32_t 
TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
         int64_t cumu_interval = now - last_cumu;
 
         // Condition 3: the time interval between compactions exceeds the 
value specified by parameter _compaction_time_threshold_second
-        if (cumu_interval > 
(config::time_series_compaction_time_threshold_seconds * 1000)) {
+        if (cumu_interval >
+            
(tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() * 
1000)) {
             return score;
         }
     } else if (score > 0) {
@@ -199,8 +202,9 @@ int 
TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
         transient_size += 1;
         input_rowsets->push_back(rowset);
 
-        // Condition 1: the size of input files for compaction meets the 
requirement of parameter _compaction_goal_size
-        if (total_size >= (config::time_series_compaction_goal_size_mbytes * 
1024 * 1024)) {
+        // Condition 1: the size of input files for compaction meets the 
requirement of parameter compaction_goal_size
+        if (total_size >=
+            (tablet->tablet_meta()->time_series_compaction_goal_size_mbytes() 
* 1024 * 1024)) {
             if (input_rowsets->size() == 1 &&
                 
!input_rowsets->front()->rowset_meta()->is_segments_overlapping()) {
                 // Only 1 non-overlapping rowset, skip it
@@ -225,8 +229,8 @@ int 
TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
         return transient_size;
     }
 
-    // Condition 2: the number of input files reaches the threshold specified 
by parameter _compaction_file_count_threshold
-    if (*compaction_score >= 
config::time_series_compaction_file_count_threshold) {
+    // Condition 2: the number of input files reaches the threshold specified 
by parameter compaction_file_count_threshold
+    if (*compaction_score >= 
tablet->tablet_meta()->time_series_compaction_file_count_threshold()) {
         return transient_size;
     }
 
@@ -235,8 +239,9 @@ int 
TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
     if (last_cumu != 0) {
         int64_t cumu_interval = now - last_cumu;
 
-        // Condition 3: the time interval between compactions exceeds the 
value specified by parameter _compaction_time_threshold_second
-        if (cumu_interval > 
(config::time_series_compaction_time_threshold_seconds * 1000)) {
+        // Condition 3: the time interval between compactions exceeds the 
value specified by parameter compaction_time_threshold_second
+        if (cumu_interval >
+            
(tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() * 
1000)) {
             return transient_size;
         }
     }
diff --git a/be/src/olap/cumulative_compaction_time_series_policy.h 
b/be/src/olap/cumulative_compaction_time_series_policy.h
index 3fc3362fa8..1bae5ecfb1 100644
--- a/be/src/olap/cumulative_compaction_time_series_policy.h
+++ b/be/src/olap/cumulative_compaction_time_series_policy.h
@@ -21,13 +21,13 @@
 
 namespace doris {
 
-inline std::string_view CUMULATIVE_TIME_SERIES_POLICY = "time_series";
+inline constexpr std::string_view CUMULATIVE_TIME_SERIES_POLICY = 
"time_series";
 
 /// TimeSeries cumulative compaction policy implementation.
 /// The following three conditions will be considered when calculating 
compaction scores and selecting input rowsets in this policy:
-/// Condition 1: the size of input files for compaction meets the requirement 
of parameter _compaction_goal_size
-/// Condition 2: the number of input files reaches the threshold specified by 
parameter _compaction_file_count_threshold
-/// Condition 3: the time interval between compactions exceeds the value 
specified by parameter _compaction_time_threshold_seconds
+/// Condition 1: the size of input files for compaction meets the requirement 
of time_series_compaction_goal_size_mbytes
+/// Condition 2: the number of input files reaches the threshold specified by 
time_series_compaction_file_count_threshold
+/// Condition 3: the time interval between compactions exceeds the value 
specified by  time_series_compaction_time_threshold_seconds
 /// The conditions are evaluated sequentially, starting with Condition 1.
 /// If any condition is met, the compaction score calculation or selection of 
input rowsets will be successful.
 class TimeSeriesCumulativeCompactionPolicy final : public 
CumulativeCompactionPolicy {
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index dab16000c0..705d803929 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -52,6 +52,7 @@
 #include "olap/cold_data_compaction.h"
 #include "olap/compaction_permit_limiter.h"
 #include "olap/cumulative_compaction_policy.h"
+#include "olap/cumulative_compaction_time_series_policy.h"
 #include "olap/data_dir.h"
 #include "olap/olap_common.h"
 #include "olap/rowset/beta_rowset_writer.h"
@@ -834,7 +835,7 @@ std::vector<TabletSharedPtr> 
StorageEngine::_generate_compaction_tasks(
                     compaction_type == CompactionType::CUMULATIVE_COMPACTION
                             ? copied_cumu_map[data_dir]
                             : copied_base_map[data_dir],
-                    &disk_max_score, _cumulative_compaction_policy);
+                    &disk_max_score, _cumulative_compaction_policies);
             if (tablet != nullptr) {
                 if 
(!tablet->tablet_meta()->tablet_schema()->disable_auto_compaction()) {
                     if (need_pick_tablet) {
@@ -864,9 +865,13 @@ std::vector<TabletSharedPtr> 
StorageEngine::_generate_compaction_tasks(
 }
 
 void StorageEngine::_update_cumulative_compaction_policy() {
-    if (_cumulative_compaction_policy == nullptr) {
-        _cumulative_compaction_policy =
-                
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy();
+    if (_cumulative_compaction_policies.empty()) {
+        _cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY] =
+                
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
+                        CUMULATIVE_SIZE_BASED_POLICY);
+        _cumulative_compaction_policies[CUMULATIVE_TIME_SERIES_POLICY] =
+                
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
+                        CUMULATIVE_TIME_SERIES_POLICY);
     }
 }
 
@@ -976,8 +981,13 @@ Status 
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
 Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, 
CompactionType compaction_type,
                                              bool force) {
     _update_cumulative_compaction_policy();
-    if (tablet->get_cumulative_compaction_policy() == nullptr) {
-        
tablet->set_cumulative_compaction_policy(_cumulative_compaction_policy);
+    // alter table tableName set ("compaction_policy"="time_series")
+    // if atler table's compaction  policy, we need to modify tablet 
compaction policy shared ptr
+    if (tablet->get_cumulative_compaction_policy() == nullptr ||
+        tablet->get_cumulative_compaction_policy()->name() !=
+                tablet->tablet_meta()->compaction_policy()) {
+        tablet->set_cumulative_compaction_policy(
+                
_cumulative_compaction_policies.at(tablet->tablet_meta()->compaction_policy()));
     }
     tablet->set_skip_compaction(false);
     return _submit_compaction_task(tablet, compaction_type, force);
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 585f26902a..25f565f031 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -460,7 +460,9 @@ private:
 
     std::shared_ptr<StreamLoadRecorder> _stream_load_recorder;
 
-    std::shared_ptr<CumulativeCompactionPolicy> _cumulative_compaction_policy;
+    // we use unordered_map to store all cumulative compaction policy sharded 
ptr
+    std::unordered_map<std::string_view, 
std::shared_ptr<CumulativeCompactionPolicy>>
+            _cumulative_compaction_policies;
 
     scoped_refptr<Thread> _cooldown_tasks_producer_thread;
     scoped_refptr<Thread> _remove_unused_remote_files_thread;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 19c8a8467e..13a887c429 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -284,7 +284,8 @@ Status Tablet::_init_once_action() {
 #ifdef BE_TEST
     // init cumulative compaction policy by type
     _cumulative_compaction_policy =
-            
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy();
+            
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
+                    _tablet_meta->compaction_policy());
 #endif
 
     RowsetVector rowset_vec;
@@ -1062,7 +1063,7 @@ uint32_t Tablet::_calc_base_compaction_score() const {
 
     // In the time series compaction policy, we want the base compaction to be 
triggered
     // when there are delete versions present.
-    if (config::compaction_policy == CUMULATIVE_TIME_SERIES_POLICY) {
+    if (_tablet_meta->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY) {
         return (base_rowset_exist && has_delete) ? score : 0;
     }
 
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 8b68e62406..36d19cf8ed 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -40,6 +40,7 @@
 #include "gutil/strings/strcat.h"
 #include "gutil/strings/substitute.h"
 #include "io/fs/local_file_system.h"
+#include "olap/cumulative_compaction_time_series_policy.h"
 #include "olap/data_dir.h"
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
@@ -678,7 +679,8 @@ void TabletManager::get_tablet_stat(TTabletStatResult* 
result) {
 TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
         CompactionType compaction_type, DataDir* data_dir,
         const std::unordered_set<TTabletId>& tablet_submitted_compaction, 
uint32_t* score,
-        std::shared_ptr<CumulativeCompactionPolicy> 
cumulative_compaction_policy) {
+        const std::unordered_map<std::string_view, 
std::shared_ptr<CumulativeCompactionPolicy>>&
+                all_cumulative_compaction_policies) {
     int64_t now_ms = UnixMillis();
     const string& compaction_type_str =
             compaction_type == CompactionType::BASE_COMPACTION ? "base" : 
"cumulative";
@@ -729,7 +731,8 @@ TabletSharedPtr 
TabletManager::find_best_tablet_to_compaction(
                     continue;
                 }
             }
-
+            auto cumulative_compaction_policy = 
all_cumulative_compaction_policies.at(
+                    tablet_ptr->tablet_meta()->compaction_policy());
             uint32_t current_compaction_score = 
tablet_ptr->calc_compaction_score(
                     compaction_type, cumulative_compaction_policy);
             if (current_compaction_score < 5) {
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index f7f41dcef1..2371da868a 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -77,7 +77,8 @@ public:
     TabletSharedPtr find_best_tablet_to_compaction(
             CompactionType compaction_type, DataDir* data_dir,
             const std::unordered_set<TTabletId>& tablet_submitted_compaction, 
uint32_t* score,
-            std::shared_ptr<CumulativeCompactionPolicy> 
cumulative_compaction_policy);
+            const std::unordered_map<std::string_view, 
std::shared_ptr<CumulativeCompactionPolicy>>&
+                    all_cumulative_compaction_policies);
 
     TabletSharedPtr get_tablet(TTabletId tablet_id, bool include_deleted = 
false,
                                std::string* err = nullptr);
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 01d206ebe9..612c50280d 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -65,7 +65,10 @@ Status TabletMeta::create(const TCreateTabletReq& request, 
const TabletUid& tabl
             request.__isset.enable_unique_key_merge_on_write
                     ? request.enable_unique_key_merge_on_write
                     : false,
-            std::move(binlog_config));
+            std::move(binlog_config), request.compaction_policy,
+            request.time_series_compaction_goal_size_mbytes,
+            request.time_series_compaction_file_count_threshold,
+            request.time_series_compaction_time_threshold_seconds);
     return Status::OK();
 }
 
@@ -81,7 +84,10 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t 
partition_id, int64_t tablet_id
                        TabletUid tablet_uid, TTabletType::type tabletType,
                        TCompressionType::type compression_type, int64_t 
storage_policy_id,
                        bool enable_unique_key_merge_on_write,
-                       std::optional<TBinlogConfig> binlog_config)
+                       std::optional<TBinlogConfig> binlog_config, std::string 
compaction_policy,
+                       int64_t time_series_compaction_goal_size_mbytes,
+                       int64_t time_series_compaction_file_count_threshold,
+                       int64_t time_series_compaction_time_threshold_seconds)
         : _tablet_uid(0, 0),
           _schema(new TabletSchema),
           _delete_bitmap(new DeleteBitmap(tablet_id)) {
@@ -102,6 +108,13 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t 
partition_id, int64_t tablet_id
                                            : TabletTypePB::TABLET_TYPE_MEMORY);
     
tablet_meta_pb.set_enable_unique_key_merge_on_write(enable_unique_key_merge_on_write);
     tablet_meta_pb.set_storage_policy_id(storage_policy_id);
+    tablet_meta_pb.set_compaction_policy(compaction_policy);
+    tablet_meta_pb.set_time_series_compaction_goal_size_mbytes(
+            time_series_compaction_goal_size_mbytes);
+    tablet_meta_pb.set_time_series_compaction_file_count_threshold(
+            time_series_compaction_file_count_threshold);
+    tablet_meta_pb.set_time_series_compaction_time_threshold_seconds(
+            time_series_compaction_time_threshold_seconds);
     TabletSchemaPB* schema = tablet_meta_pb.mutable_schema();
     schema->set_num_short_key_columns(tablet_schema.short_key_column_count);
     
schema->set_num_rows_per_row_block(config::default_num_rows_per_column_file_block);
@@ -266,7 +279,6 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t 
partition_id, int64_t tablet_id
     if (tablet_schema.__isset.skip_write_index_on_load) {
         
schema->set_skip_write_index_on_load(tablet_schema.skip_write_index_on_load);
     }
-
     if (binlog_config.has_value()) {
         BinlogConfig tmp_binlog_config;
         tmp_binlog_config = binlog_config.value();
@@ -298,7 +310,13 @@ TabletMeta::TabletMeta(const TabletMeta& b)
           _cooldown_meta_id(b._cooldown_meta_id),
           
_enable_unique_key_merge_on_write(b._enable_unique_key_merge_on_write),
           _delete_bitmap(b._delete_bitmap),
-          _binlog_config(b._binlog_config) {};
+          _binlog_config(b._binlog_config),
+          _compaction_policy(b._compaction_policy),
+          
_time_series_compaction_goal_size_mbytes(b._time_series_compaction_goal_size_mbytes),
+          _time_series_compaction_file_count_threshold(
+                  b._time_series_compaction_file_count_threshold),
+          _time_series_compaction_time_threshold_seconds(
+                  b._time_series_compaction_time_threshold_seconds) {};
 
 void TabletMeta::init_column_from_tcolumn(uint32_t unique_id, const TColumn& 
tcolumn,
                                           ColumnPB* column) {
@@ -562,6 +580,13 @@ void TabletMeta::init_from_pb(const TabletMetaPB& 
tablet_meta_pb) {
     if (tablet_meta_pb.has_binlog_config()) {
         _binlog_config = tablet_meta_pb.binlog_config();
     }
+    _compaction_policy = tablet_meta_pb.compaction_policy();
+    _time_series_compaction_goal_size_mbytes =
+            tablet_meta_pb.time_series_compaction_goal_size_mbytes();
+    _time_series_compaction_file_count_threshold =
+            tablet_meta_pb.time_series_compaction_file_count_threshold();
+    _time_series_compaction_time_threshold_seconds =
+            tablet_meta_pb.time_series_compaction_time_threshold_seconds();
 }
 
 void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
@@ -636,6 +661,13 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
         }
     }
     _binlog_config.to_pb(tablet_meta_pb->mutable_binlog_config());
+    tablet_meta_pb->set_compaction_policy(compaction_policy());
+    tablet_meta_pb->set_time_series_compaction_goal_size_mbytes(
+            time_series_compaction_goal_size_mbytes());
+    tablet_meta_pb->set_time_series_compaction_file_count_threshold(
+            time_series_compaction_file_count_threshold());
+    tablet_meta_pb->set_time_series_compaction_time_threshold_seconds(
+            time_series_compaction_time_threshold_seconds());
 }
 
 uint32_t TabletMeta::mem_size() const {
@@ -860,6 +892,15 @@ bool operator==(const TabletMeta& a, const TabletMeta& b) {
     if (a._in_restore_mode != b._in_restore_mode) return false;
     if (a._preferred_rowset_type != b._preferred_rowset_type) return false;
     if (a._storage_policy_id != b._storage_policy_id) return false;
+    if (a._compaction_policy != b._compaction_policy) return false;
+    if (a._time_series_compaction_goal_size_mbytes != 
b._time_series_compaction_goal_size_mbytes)
+        return false;
+    if (a._time_series_compaction_file_count_threshold !=
+        b._time_series_compaction_file_count_threshold)
+        return false;
+    if (a._time_series_compaction_time_threshold_seconds !=
+        b._time_series_compaction_time_threshold_seconds)
+        return false;
     return true;
 }
 
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 70830b82de..c9ddff2417 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -106,7 +106,11 @@ public:
                TabletUid tablet_uid, TTabletType::type tabletType,
                TCompressionType::type compression_type, int64_t 
storage_policy_id = 0,
                bool enable_unique_key_merge_on_write = false,
-               std::optional<TBinlogConfig> binlog_config = {});
+               std::optional<TBinlogConfig> binlog_config = {},
+               std::string compaction_policy = "size_based",
+               int64_t time_series_compaction_goal_size_mbytes = 1024,
+               int64_t time_series_compaction_file_count_threshold = 2000,
+               int64_t time_series_compaction_time_threshold_seconds = 3600);
     // If need add a filed in TableMeta, filed init copy in copy construct 
function
     TabletMeta(const TabletMeta& tablet_meta);
     TabletMeta(TabletMeta&& tablet_meta) = delete;
@@ -228,6 +232,29 @@ public:
         _binlog_config = std::move(binlog_config);
     }
 
+    void set_compaction_policy(std::string compaction_policy) {
+        _compaction_policy = compaction_policy;
+    }
+    std::string compaction_policy() const { return _compaction_policy; }
+    void set_time_series_compaction_goal_size_mbytes(int64_t goal_size_mbytes) 
{
+        _time_series_compaction_goal_size_mbytes = goal_size_mbytes;
+    }
+    int64_t time_series_compaction_goal_size_mbytes() const {
+        return _time_series_compaction_goal_size_mbytes;
+    }
+    void set_time_series_compaction_file_count_threshold(int64_t 
file_count_threshold) {
+        _time_series_compaction_file_count_threshold = file_count_threshold;
+    }
+    int64_t time_series_compaction_file_count_threshold() const {
+        return _time_series_compaction_file_count_threshold;
+    }
+    void set_time_series_compaction_time_threshold_seconds(int64_t 
time_threshold) {
+        _time_series_compaction_time_threshold_seconds = time_threshold;
+    }
+    int64_t time_series_compaction_time_threshold_seconds() const {
+        return _time_series_compaction_time_threshold_seconds;
+    }
+
 private:
     Status _save_meta(DataDir* data_dir);
 
@@ -274,6 +301,12 @@ private:
     // binlog config
     BinlogConfig _binlog_config {};
 
+    // meta for compaction
+    std::string _compaction_policy;
+    int64_t _time_series_compaction_goal_size_mbytes = 0;
+    int64_t _time_series_compaction_file_count_threshold = 0;
+    int64_t _time_series_compaction_time_threshold_seconds = 0;
+
     mutable std::shared_mutex _meta_lock;
 };
 
diff --git a/be/test/olap/cumulative_compaction_policy_test.cpp 
b/be/test/olap/cumulative_compaction_policy_test.cpp
index 83b779ceb1..dc14907e3e 100644
--- a/be/test/olap/cumulative_compaction_policy_test.cpp
+++ b/be/test/olap/cumulative_compaction_policy_test.cpp
@@ -340,7 +340,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
calc_cumulative_compaction_score
     _tablet->calculate_cumulative_point();
 
     std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy =
-            
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy();
+            
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
+                    CUMULATIVE_SIZE_BASED_POLICY);
     const uint32_t score = 
_tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION,
                                                           
cumulative_compaction_policy);
 
@@ -359,7 +360,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
calc_cumulative_compaction_score
     _tablet->init();
     _tablet->calculate_cumulative_point();
     std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy =
-            
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy();
+            
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
+                    CUMULATIVE_SIZE_BASED_POLICY);
     const uint32_t score = 
_tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION,
                                                           
cumulative_compaction_policy);
 
diff --git a/be/test/olap/cumulative_compaction_time_series_policy_test.cpp 
b/be/test/olap/cumulative_compaction_time_series_policy_test.cpp
index 74bcbe70ac..f9b9117233 100644
--- a/be/test/olap/cumulative_compaction_time_series_policy_test.cpp
+++ b/be/test/olap/cumulative_compaction_time_series_policy_test.cpp
@@ -35,17 +35,17 @@ namespace doris {
 
 class TestTimeSeriesCumulativeCompactionPolicy : public testing::Test {
 public:
-    TestTimeSeriesCumulativeCompactionPolicy() {}
+    TestTimeSeriesCumulativeCompactionPolicy() = default;
     void SetUp() {
-        config::compaction_policy = "time_series";
-        config::time_series_compaction_goal_size_mbytes = 1024;
-        config::time_series_compaction_file_count_threshold = 10;
-        config::time_series_compaction_time_threshold_seconds = 3600;
-
         _tablet_meta = static_cast<TabletMetaSharedPtr>(new TabletMeta(
                 1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, 
UniqueId(9, 10),
                 TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F));
 
+        
_tablet_meta->set_compaction_policy(std::string(CUMULATIVE_TIME_SERIES_POLICY));
+        _tablet_meta->set_time_series_compaction_goal_size_mbytes(100);
+        _tablet_meta->set_time_series_compaction_file_count_threshold(10);
+        _tablet_meta->set_time_series_compaction_time_threshold_seconds(3600);
+
         _json_rowset_meta = R"({
             "rowset_id": 540081,
             "tablet_id": 15673,
@@ -106,28 +106,28 @@ public:
     void init_rs_meta_big_rowset(std::vector<RowsetMetaSharedPtr>* rs_metas) {
         RowsetMetaSharedPtr ptr1(new RowsetMeta());
         init_rs_meta(ptr1, 0, 1);
-        ptr1->set_total_disk_size(1024 * 1024 * 1024);
+        ptr1->set_total_disk_size(100 * 1024 * 1024);
         rs_metas->push_back(ptr1);
 
         RowsetMetaSharedPtr ptr2(new RowsetMeta());
         init_rs_meta(ptr2, 2, 3);
-        ptr2->set_total_disk_size(1024 * 1024 * 1024);
+        ptr2->set_total_disk_size(100 * 1024 * 1024);
         rs_metas->push_back(ptr2);
 
         RowsetMetaSharedPtr ptr3(new RowsetMeta());
         init_rs_meta(ptr3, 4, 4);
-        ptr3->set_total_disk_size(512 * 1024 * 1024);
+        ptr3->set_total_disk_size(51 * 1024 * 1024);
         rs_metas->push_back(ptr3);
 
         RowsetMetaSharedPtr ptr4(new RowsetMeta());
         init_rs_meta(ptr4, 5, 5);
         ptr4->set_segments_overlap(OVERLAPPING);
-        ptr4->set_total_disk_size(512 * 1024 * 1024);
+        ptr4->set_total_disk_size(51 * 1024 * 1024);
         rs_metas->push_back(ptr4);
 
         RowsetMetaSharedPtr ptr5(new RowsetMeta());
         init_rs_meta(ptr5, 6, 6);
-        ptr5->set_total_disk_size(512 * 1024 * 1024);
+        ptr5->set_total_disk_size(51 * 1024 * 1024);
         rs_metas->push_back(ptr5);
     }
 
@@ -336,7 +336,8 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy, 
calc_cumulative_compaction_scor
     _tablet->calculate_cumulative_point();
 
     std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy =
-            
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy();
+            
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
+                    CUMULATIVE_TIME_SERIES_POLICY);
     const uint32_t score = 
_tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION,
                                                           
cumulative_compaction_policy);
 
@@ -355,7 +356,8 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy, 
calc_cumulative_compaction_scor
     _tablet->init();
     _tablet->calculate_cumulative_point();
     std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy =
-            
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy();
+            
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
+                    CUMULATIVE_TIME_SERIES_POLICY);
     const uint32_t score = 
_tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION,
                                                           
cumulative_compaction_policy);
 
diff --git a/docs/en/docs/admin-manual/config/be-config.md 
b/docs/en/docs/admin-manual/config/be-config.md
index 1457e935d4..1765d1ca88 100644
--- a/docs/en/docs/admin-manual/config/be-config.md
+++ b/docs/en/docs/admin-manual/config/be-config.md
@@ -659,33 +659,6 @@ BaseCompaction:546859:
 * Description: Minimal interval (s) to update peer replica infos
 * Default value: 60 (s)
 
-#### `compaction_policy`
-
-* Type: string
-* Description: Configure the compaction strategy in the compression phase. 
Currently, two compaction strategies are implemented, size_based and 
time_series.
-  - size_based: Version merging can only be performed when the disk volume of 
the rowset is the same order of magnitude. After merging, qualified rowsets are 
promoted to the base compaction stage. In the case of a large number of small 
batch imports, it can reduce the write magnification of base compact, balance 
the read magnification and space magnification, and reduce the data of file 
versions.
-  - time_series: When the disk size of a rowset accumulates to a certain 
threshold, version merging takes place. The merged rowset is directly promoted 
to the base compaction stage. This approach effectively reduces the write 
amplification rate of compaction, especially in scenarios with continuous 
imports in a time series context.
-* Default value: size_based
-
-#### `time_series_compaction_goal_size_mbytes`
-
-* Type: int64
-* Description: Enabling time series compaction will utilize this parameter to 
adjust the size of input files for each compaction. The output file size will 
be approximately equal to the input file size.
-* Default value: 512
-
-#### `time_series_compaction_file_count_threshold`
-
-* Type: int64
-* Description: Enabling time series compaction will utilize this parameter to 
adjust the minimum number of input files for each compaction. It comes into 
effect only when the condition specified by 
time_series_compaction_goal_size_mbytes is not met.
-  - If the number of files in a tablet exceeds the configured threshold, it 
will trigger a compaction process.
-* Default value: 2000
-
-#### `time_series_compaction_time_threshold_seconds`
-
-* Type: int64
-* Description: When time series compaction is enabled, a significant duration 
passes without a compaction being executed, a compaction will be triggered.
-* Default value: 3600 (s)
-
 
 ### Load
 
diff --git 
a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
 
b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
index ab9e3a6f0f..2727e2f734 100644
--- 
a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
+++ 
b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
@@ -409,6 +409,37 @@ Set table properties. The following attributes are 
currently supported:
 
    `"skip_write_index_on_load" = "false"`
 
+* `compaction_policy`
+
+    Configure the compaction strategy in the compression phase. Only support 
configuring the compaction policy as "time_series" or "size_based".
+
+    time_series: When the disk size of a rowset accumulates to a certain 
threshold, version merging takes place. The merged rowset is directly promoted 
to the base compaction stage. This approach effectively reduces the write 
amplification rate of compaction, especially in scenarios with continuous 
imports in a time series context.
+
+    In the case of time series compaction, the execution of compaction is 
adjusted using parameters that have the prefix time_series_compaction.
+
+    `"compaction_policy" = ""`
+
+* `time_series_compaction_goal_size_mbytes`
+
+    Time series compaction policy will utilize this parameter to adjust the 
size of input files for each compaction. The output file size will be 
approximately equal to the input file size.
+
+    `"time_series_compaction_goal_size_mbytes" = "1024"`
+
+* `time_series_compaction_file_count_threshold`
+
+    Time series compaction policy will utilize this parameter to adjust the 
minimum number of input files for each compaction.
+
+    If the number of files in a tablet exceeds the configured threshold, it 
will trigger a compaction process.
+
+    `"time_series_compaction_file_count_threshold" = "2000"`
+
+* `time_series_compaction_time_threshold_seconds`
+
+     When time series compaction policy is applied, a significant duration 
passes without a compaction being executed, a compaction will be triggered.
+
+    `"time_series_compaction_time_threshold_seconds" = "3600"`
+
+
 * Dynamic partition related
 
    The relevant parameters of dynamic partition are as follows:
diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md 
b/docs/zh-CN/docs/admin-manual/config/be-config.md
index 75bdda8533..2a704cb8e9 100644
--- a/docs/zh-CN/docs/admin-manual/config/be-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/be-config.md
@@ -673,33 +673,6 @@ BaseCompaction:546859:
 * 描述:更新 peer replica infos 的最小间隔时间
 * 默认值:60(s)
 
-#### `compaction_policy`
-
-* 类型:string
-* 描述:配置 compaction 的合并策略,目前实现了两种合并策略,size_based 和 time_series
-  - size_based: 仅当 rowset 的磁盘体积在相同数量级时才进行版本合并。合并之后满足条件的 rowset 进行晋升到 base 
compaction阶段。能够做到在大量小批量导入的情况下:降低base 
compact的写入放大率,并在读取放大率和空间放大率之间进行权衡,同时减少了文件版本的数据。
-  - time_series: 当 rowset 的磁盘体积积攒到一定大小时进行版本合并。合并后的 rowset 直接晋升到 base 
compaction 阶段。在时序场景持续导入的情况下有效降低 compact 的写入放大率。
-* 默认值:size_based
-
-#### `time_series_compaction_goal_size_mbytes`
-
-* 类型:int64
-* 描述:开启 time series compaction 时,将使用此参数来调整每次 compaction 输入的文件的大小,输出的文件大小和输入相当
-* 默认值:512
-
-#### `time_series_compaction_file_count_threshold`
-
-* 类型:int64
-* 描述:开启 time series compaction 时,将使用此参数来调整每次 compaction 输入的文件数量的最小值,只有当 
time_series_compaction_goal_size_mbytes 条件不满足时,该参数才会发挥作用
-  - 一个 tablet 中文件数超过该配置,会触发 compaction
-* 默认值:2000
-
-#### `time_series_compaction_time_threshold_seconds`
-
-* 类型:int64
-* 描述:开启 time series compaction 时,将使用此参数来调整 compaction 的最长时间间隔,即长时间未执行过 
compaction 时,就会触发一次 compaction,单位为秒
-* 默认值:3600
-
 
 ### 导入
 
diff --git 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
index 2fc5d23009..0d14ba3f4e 100644
--- 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
+++ 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
@@ -393,6 +393,36 @@ UNIQUE KEY(k1, k2)
 
     `"skip_write_index_on_load" = "false"`
 
+* `compaction_policy`
+
+    配置这个表的 compaction 的合并策略,仅支持配置为 time_series 或者 size_based
+
+    time_series: 当 rowset 的磁盘体积积攒到一定大小时进行版本合并。合并后的 rowset 直接晋升到 base 
compaction 阶段。在时序场景持续导入的情况下有效降低 compact 的写入放大率
+
+    此策略将使用 time_series_compaction 为前缀的参数调整 compaction 的执行
+
+    `"compaction_policy" = ""`
+
+* `time_series_compaction_goal_size_mbytes`
+
+    compaction 的合并策略为 time_series 时,将使用此参数来调整每次 compaction 
输入的文件的大小,输出的文件大小和输入相当
+
+    `"time_series_compaction_goal_size_mbytes" = "1024"`
+
+* `time_series_compaction_file_count_threshold`
+
+    compaction 的合并策略为 time_series 时,将使用此参数来调整每次 compaction 输入的文件数量的最小值
+
+    一个 tablet 中,文件数超过该配置,就会触发 compaction
+
+    `"time_series_compaction_file_count_threshold" = "2000"`
+
+* `time_series_compaction_time_threshold_seconds`
+
+    compaction 的合并策略为 time_series 时,将使用此参数来调整 compaction 的最长时间间隔,即长时间未执行过 
compaction 时,就会触发一次 compaction,单位为秒
+
+    `"time_series_compaction_time_threshold_seconds" = "3600"`
+
 * 动态分区相关
 
     动态分区相关参数如下:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
index 53eb9052e8..f8c90b8a52 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
@@ -211,6 +211,28 @@ public class Alter {
         } else if (currentAlterOps.checkIsBeingSynced(alterClauses)) {
             
olapTable.setIsBeingSynced(currentAlterOps.isBeingSynced(alterClauses));
             needProcessOutsideTableLock = true;
+        } else if (currentAlterOps.checkCompactionPolicy(alterClauses)
+                    && currentAlterOps.getCompactionPolicy(alterClauses) != 
olapTable.getCompactionPolicy()) {
+            
olapTable.setCompactionPolicy(currentAlterOps.getCompactionPolicy(alterClauses));
+            needProcessOutsideTableLock = true;
+        } else if 
(currentAlterOps.checkTimeSeriesCompactionGoalSizeMbytes(alterClauses)
+                    && 
currentAlterOps.getTimeSeriesCompactionGoalSizeMbytes(alterClauses)
+                                                != 
olapTable.getTimeSeriesCompactionGoalSizeMbytes()) {
+            olapTable.setTimeSeriesCompactionGoalSizeMbytes(currentAlterOps
+                                            
.getTimeSeriesCompactionGoalSizeMbytes(alterClauses));
+            needProcessOutsideTableLock = true;
+        } else if 
(currentAlterOps.checkTimeSeriesCompactionFileCountThreshold(alterClauses)
+                    && 
currentAlterOps.getTimeSeriesCompactionFileCountThreshold(alterClauses)
+                                                != 
olapTable.getTimeSeriesCompactionFileCountThreshold()) {
+            olapTable.setTimeSeriesCompactionFileCountThreshold(currentAlterOps
+                                            
.getTimeSeriesCompactionFileCountThreshold(alterClauses));
+            needProcessOutsideTableLock = true;
+        } else if 
(currentAlterOps.checkTimeSeriesCompactionTimeThresholdSeconds(alterClauses)
+                    && 
currentAlterOps.getTimeSeriesCompactionTimeThresholdSeconds(alterClauses)
+                                                != 
olapTable.getTimeSeriesCompactionTimeThresholdSeconds()) {
+            
olapTable.setTimeSeriesCompactionTimeThresholdSeconds(currentAlterOps
+                                            
.getTimeSeriesCompactionTimeThresholdSeconds(alterClauses));
+            needProcessOutsideTableLock = true;
         } else if (currentAlterOps.checkBinlogConfigChange(alterClauses)) {
             if (!Config.enable_feature_binlog) {
                 throw new DdlException("Binlog feature is not enabled");
@@ -514,7 +536,13 @@ public class Alter {
                 // currently, only in memory and storage policy property could 
reach here
                 
Preconditions.checkState(properties.containsKey(PropertyAnalyzer.PROPERTIES_INMEMORY)
                         || 
properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY)
-                        || 
properties.containsKey(PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED));
+                        || 
properties.containsKey(PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED)
+                        || 
properties.containsKey(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY)
+                        || 
properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)
+                        || properties
+                            
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD)
+                        || properties
+                            
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS));
                 ((SchemaChangeHandler) 
schemaChangeHandler).updateTableProperties(db, tableName, properties);
             } else {
                 throw new DdlException("Invalid alter operation: " + 
alterClause.getOpType());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterOperations.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterOperations.java
index 22104522ed..62721fa37e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterOperations.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterOperations.java
@@ -93,6 +93,60 @@ public class AlterOperations {
             || 
clause.getProperties().containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_HISTORY_NUMS));
     }
 
+    public boolean checkCompactionPolicy(List<AlterClause> alterClauses) {
+        return alterClauses.stream().filter(clause ->
+            clause instanceof ModifyTablePropertiesClause
+        ).anyMatch(clause -> 
clause.getProperties().containsKey(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY));
+    }
+
+    public String getCompactionPolicy(List<AlterClause> alterClauses) {
+        return alterClauses.stream().filter(clause ->
+            clause instanceof ModifyTablePropertiesClause
+        ).map(c -> ((ModifyTablePropertiesClause) 
c).compactionPolicy()).findFirst().orElse("");
+    }
+
+    public boolean checkTimeSeriesCompactionGoalSizeMbytes(List<AlterClause> 
alterClauses) {
+        return alterClauses.stream().filter(clause ->
+            clause instanceof ModifyTablePropertiesClause
+        ).anyMatch(clause -> clause.getProperties()
+                    
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES));
+    }
+
+    public long getTimeSeriesCompactionGoalSizeMbytes(List<AlterClause> 
alterClauses) {
+        return alterClauses.stream().filter(clause ->
+            clause instanceof ModifyTablePropertiesClause
+        ).map(c -> ((ModifyTablePropertiesClause) c)
+        .timeSeriesCompactionGoalSizeMbytes()).findFirst().orElse((long) -1);
+    }
+
+    public boolean 
checkTimeSeriesCompactionFileCountThreshold(List<AlterClause> alterClauses) {
+        return alterClauses.stream().filter(clause ->
+            clause instanceof ModifyTablePropertiesClause
+        ).anyMatch(clause -> clause.getProperties()
+                    
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD));
+    }
+
+    public long getTimeSeriesCompactionFileCountThreshold(List<AlterClause> 
alterClauses) {
+        return alterClauses.stream().filter(clause ->
+            clause instanceof ModifyTablePropertiesClause
+        ).map(c -> ((ModifyTablePropertiesClause) c)
+        .timeSeriesCompactionFileCountThreshold()).findFirst().orElse((long) 
-1);
+    }
+
+    public boolean 
checkTimeSeriesCompactionTimeThresholdSeconds(List<AlterClause> alterClauses) {
+        return alterClauses.stream().filter(clause ->
+            clause instanceof ModifyTablePropertiesClause
+        ).anyMatch(clause -> clause.getProperties()
+                    
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS));
+    }
+
+    public long getTimeSeriesCompactionTimeThresholdSeconds(List<AlterClause> 
alterClauses) {
+        return alterClauses.stream().filter(clause ->
+            clause instanceof ModifyTablePropertiesClause
+        ).map(c -> ((ModifyTablePropertiesClause) c)
+        .timeSeriesCompactionTimeThresholdSeconds()).findFirst().orElse((long) 
-1);
+    }
+
     public boolean isBeingSynced(List<AlterClause> alterClauses) {
         return alterClauses.stream().filter(clause ->
             clause instanceof ModifyTablePropertiesClause
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index e05130b103..5ab9f77a88 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -284,6 +284,10 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
                                 tbl.disableAutoCompaction(),
                                 tbl.enableSingleReplicaCompaction(),
                                 tbl.skipWriteIndexOnLoad(),
+                                tbl.getCompactionPolicy(),
+                                tbl.getTimeSeriesCompactionGoalSizeMbytes(),
+                                
tbl.getTimeSeriesCompactionFileCountThreshold(),
+                                
tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
                                 tbl.storeRowColumn(),
                                 tbl.isDynamicSchema(),
                                 binlogConfig);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index ff2fcc3fd6..df7fdc81c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -2137,13 +2137,43 @@ public class SchemaChangeHandler extends AlterHandler {
         }
         long storagePolicyId = storagePolicyNameToId(storagePolicy);
 
-        if (isInMemory < 0 && storagePolicyId < 0) {
-            LOG.info("Properties already up-to-date");
-            return;
+        String compactionPolicy = 
properties.get(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY);
+        if (compactionPolicy != null
+                                    && 
!compactionPolicy.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)
+                                    && 
!compactionPolicy.equals(PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY)) {
+            throw new UserException("Table compaction policy only support for "
+                                                + 
PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY
+                                                + " or " + 
PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY);
+        }
+
+        Map<String, Long> timeSeriesCompactionConfig = new HashMap<>();
+        if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES))
 {
+            timeSeriesCompactionConfig
+                        
.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES,
+                        Long.parseLong(properties
+                        
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)));
+        }
+        if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD))
 {
+            timeSeriesCompactionConfig
+                        
.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD,
+                        Long.parseLong(properties
+                        
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD)));
+        }
+        if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS))
 {
+            timeSeriesCompactionConfig
+                        
.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS,
+                        Long.parseLong(properties
+                        
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)));
         }
 
         for (Partition partition : partitions) {
-            updatePartitionProperties(db, olapTable.getName(), 
partition.getName(), storagePolicyId, isInMemory, null);
+            updatePartitionProperties(db, olapTable.getName(), 
partition.getName(), storagePolicyId, isInMemory,
+                                        null, compactionPolicy, 
timeSeriesCompactionConfig);
+        }
+
+        if (isInMemory < 0 && storagePolicyId < 0 && compactionPolicy == null 
&& timeSeriesCompactionConfig.isEmpty()) {
+            LOG.info("Properties already up-to-date");
+            return;
         }
 
         olapTable.writeLockOrDdlException();
@@ -2274,6 +2304,87 @@ public class SchemaChangeHandler extends AlterHandler {
         }
     }
 
+    /**
+     * Update one specified partition's properties by partition name of table
+     * This operation may return partial successfully, with an exception to 
inform user to retry
+     */
+    public void updatePartitionProperties(Database db, String tableName, 
String partitionName, long storagePolicyId,
+                                          int isInMemory, BinlogConfig 
binlogConfig, String compactionPolicy,
+                                          Map<String, Long> 
timeSeriesCompactionConfig) throws UserException {
+        // be id -> <tablet id,schemaHash>
+        Map<Long, Set<Pair<Long, Integer>>> beIdToTabletIdWithHash = 
Maps.newHashMap();
+        OlapTable olapTable = (OlapTable) 
db.getTableOrMetaException(tableName, Table.TableType.OLAP);
+        olapTable.readLock();
+        try {
+            Partition partition = olapTable.getPartition(partitionName);
+            if (partition == null) {
+                throw new DdlException(
+                        "Partition[" + partitionName + "] does not exist in 
table[" + olapTable.getName() + "]");
+            }
+
+            for (MaterializedIndex index : 
partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+                int schemaHash = 
olapTable.getSchemaHashByIndexId(index.getId());
+                for (Tablet tablet : index.getTablets()) {
+                    for (Replica replica : tablet.getReplicas()) {
+                        Set<Pair<Long, Integer>> tabletIdWithHash = 
beIdToTabletIdWithHash.computeIfAbsent(
+                                replica.getBackendId(), k -> 
Sets.newHashSet());
+                        tabletIdWithHash.add(Pair.of(tablet.getId(), 
schemaHash));
+                    }
+                }
+            }
+        } finally {
+            olapTable.readUnlock();
+        }
+
+        int totalTaskNum = beIdToTabletIdWithHash.keySet().size();
+        MarkedCountDownLatch<Long, Set<Pair<Long, Integer>>> countDownLatch = 
new MarkedCountDownLatch<>(totalTaskNum);
+        AgentBatchTask batchTask = new AgentBatchTask();
+        for (Map.Entry<Long, Set<Pair<Long, Integer>>> kv : 
beIdToTabletIdWithHash.entrySet()) {
+            countDownLatch.addMark(kv.getKey(), kv.getValue());
+            UpdateTabletMetaInfoTask task = new 
UpdateTabletMetaInfoTask(kv.getKey(), kv.getValue(), isInMemory,
+                    storagePolicyId, binlogConfig, countDownLatch, 
compactionPolicy, timeSeriesCompactionConfig);
+            batchTask.addTask(task);
+        }
+        if (!FeConstants.runningUnitTest) {
+            // send all tasks and wait them finished
+            AgentTaskQueue.addBatchTask(batchTask);
+            AgentTaskExecutor.submit(batchTask);
+            LOG.info("send update tablet meta task for table {}, partitions 
{}, number: {}", tableName, partitionName,
+                    batchTask.getTaskNum());
+
+            // estimate timeout
+            long timeout = Config.tablet_create_timeout_second * 1000L * 
totalTaskNum;
+            timeout = Math.min(timeout, Config.max_create_table_timeout_second 
* 1000);
+            boolean ok = false;
+            try {
+                ok = countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                LOG.warn("InterruptedException: ", e);
+            }
+
+            if (!ok || !countDownLatch.getStatus().ok()) {
+                String errMsg = "Failed to update partition[" + partitionName 
+ "]. tablet meta.";
+                // clear tasks
+                AgentTaskQueue.removeBatchTask(batchTask, 
TTaskType.UPDATE_TABLET_META_INFO);
+
+                if (!countDownLatch.getStatus().ok()) {
+                    errMsg += " Error: " + 
countDownLatch.getStatus().getErrorMsg();
+                } else {
+                    List<Map.Entry<Long, Set<Pair<Long, Integer>>>> 
unfinishedMarks = countDownLatch.getLeftMarks();
+                    // only show at most 3 results
+                    List<Map.Entry<Long, Set<Pair<Long, Integer>>>> subList = 
unfinishedMarks.subList(0,
+                            Math.min(unfinishedMarks.size(), 3));
+                    if (!subList.isEmpty()) {
+                        errMsg += " Unfinished mark: " + Joiner.on(", 
").join(subList);
+                    }
+                }
+                errMsg += ". This operation maybe partial successfully, You 
should retry until success.";
+                LOG.warn(errMsg);
+                throw new DdlException(errMsg);
+            }
+        }
+    }
+
     @Override
     public void cancel(CancelStmt stmt) throws DdlException {
         CancelAlterTableStmt cancelAlterTableStmt = (CancelAlterTableStmt) 
stmt;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index cc10efa951..2be16f6119 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -279,6 +279,10 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                                     tbl.disableAutoCompaction(),
                                     tbl.enableSingleReplicaCompaction(),
                                     tbl.skipWriteIndexOnLoad(),
+                                    tbl.getCompactionPolicy(),
+                                    
tbl.getTimeSeriesCompactionGoalSizeMbytes(),
+                                    
tbl.getTimeSeriesCompactionFileCountThreshold(),
+                                    
tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
                                     tbl.storeRowColumn(),
                                     tbl.isDynamicSchema(),
                                     binlogConfig);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
index 93ece524f2..1ef31b3734 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
@@ -54,6 +54,46 @@ public class ModifyTablePropertiesClause extends 
AlterTableClause {
         return isBeingSynced;
     }
 
+    private String compactionPolicy;
+
+    private long timeSeriesCompactionGoalSizeMbytes;
+
+    private long timeSeriesCompactionFileCountThreshold;
+
+    private long timeSeriesCompactionTimeThresholdSeconds;
+
+    public void setCompactionPolicy(String compactionPolicy) {
+        this.compactionPolicy = compactionPolicy;
+    }
+
+    public String compactionPolicy() {
+        return compactionPolicy;
+    }
+
+    public void setTimeSeriesCompactionGoalSizeMbytes(long 
timeSeriesCompactionGoalSizeMbytes) {
+        this.timeSeriesCompactionGoalSizeMbytes = 
timeSeriesCompactionGoalSizeMbytes;
+    }
+
+    public long timeSeriesCompactionGoalSizeMbytes() {
+        return timeSeriesCompactionGoalSizeMbytes;
+    }
+
+    public void setTimeSeriesCompactionFileCountThreshold(long 
timeSeriesCompactionFileCountThreshold) {
+        this.timeSeriesCompactionFileCountThreshold = 
timeSeriesCompactionFileCountThreshold;
+    }
+
+    public Long timeSeriesCompactionFileCountThreshold() {
+        return timeSeriesCompactionFileCountThreshold;
+    }
+
+    public void setTimeSeriesCompactionTimeThresholdSeconds(long 
timeSeriesCompactionTimeThresholdSeconds) {
+        this.timeSeriesCompactionTimeThresholdSeconds = 
timeSeriesCompactionTimeThresholdSeconds;
+    }
+
+    public Long timeSeriesCompactionTimeThresholdSeconds() {
+        return timeSeriesCompactionTimeThresholdSeconds;
+    }
+
     public ModifyTablePropertiesClause(Map<String, String> properties) {
         super(AlterOpType.MODIFY_TABLE_PROPERTY);
         this.properties = properties;
@@ -142,6 +182,65 @@ public class ModifyTablePropertiesClause extends 
AlterTableClause {
                 || 
properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_BYTES)
                 || 
properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_HISTORY_NUMS)) {
             // do nothing, will be alter in 
SchemaChangeHandler.updateBinlogConfig
+        } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY)) {
+            String compactionPolicy = 
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY, "");
+            if (compactionPolicy != null
+                                    && 
!compactionPolicy.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)
+                                    && 
!compactionPolicy.equals(PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY)) {
+                throw new AnalysisException(
+                        "Table compaction policy only support for " + 
PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY
+                        + " or " + 
PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY);
+            }
+            this.needTableStable = false;
+            setCompactionPolicy(compactionPolicy);
+        } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES))
 {
+            long goalSizeMbytes;
+            String goalSizeMbytesStr = properties
+                                        
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES);
+            try {
+                goalSizeMbytes = Long.parseLong(goalSizeMbytesStr);
+                if (goalSizeMbytes < 10) {
+                    throw new 
AnalysisException("time_series_compaction_goal_size_mbytes can not be less than 
10:"
+                        + goalSizeMbytesStr);
+                }
+            } catch (NumberFormatException e) {
+                throw new AnalysisException("Invalid 
time_series_compaction_goal_size_mbytes format: "
+                        + goalSizeMbytesStr);
+            }
+            this.needTableStable = false;
+            setTimeSeriesCompactionGoalSizeMbytes(goalSizeMbytes);
+        } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD))
 {
+            long fileCountThreshold;
+            String fileCountThresholdStr = properties
+                                        
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD);
+            try {
+                fileCountThreshold = Long.parseLong(fileCountThresholdStr);
+                if (fileCountThreshold < 10) {
+                    throw new 
AnalysisException("time_series_compaction_file_count_threshold can not be less 
than 10:"
+                                                                               
         + fileCountThresholdStr);
+                }
+            } catch (NumberFormatException e) {
+                throw new AnalysisException("Invalid 
time_series_compaction_file_count_threshold format: "
+                                                                               
 + fileCountThresholdStr);
+            }
+            this.needTableStable = false;
+            setTimeSeriesCompactionFileCountThreshold(fileCountThreshold);
+        } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS))
 {
+            long timeThresholdSeconds;
+            String timeThresholdSecondsStr = properties
+                                    
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS);
+            try {
+                timeThresholdSeconds = Long.parseLong(timeThresholdSecondsStr);
+                if (timeThresholdSeconds < 60) {
+                    throw new 
AnalysisException("time_series_compaction_time_threshold_seconds can not be 
less than 60:"
+                                                                               
         + timeThresholdSecondsStr);
+                }
+            } catch (NumberFormatException e) {
+                throw new AnalysisException("Invalid 
time_series_compaction_time_threshold_seconds format: "
+                                                                               
         + timeThresholdSecondsStr);
+            }
+            this.needTableStable = false;
+            setTimeSeriesCompactionTimeThresholdSeconds(timeThresholdSeconds);
         } else {
             throw new AnalysisException("Unknown table property: " + 
properties.keySet());
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index a5256dac64..e1c0e4a29e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -1043,6 +1043,10 @@ public class RestoreJob extends AbstractJob {
                             localTbl.disableAutoCompaction(),
                             localTbl.enableSingleReplicaCompaction(),
                             localTbl.skipWriteIndexOnLoad(),
+                            localTbl.getCompactionPolicy(),
+                            localTbl.getTimeSeriesCompactionGoalSizeMbytes(),
+                            
localTbl.getTimeSeriesCompactionFileCountThreshold(),
+                            
localTbl.getTimeSeriesCompactionTimeThresholdSeconds(),
                             localTbl.storeRowColumn(),
                             localTbl.isDynamicSchema(),
                             binlogConfig);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index f1bb519290..90ee5a9f29 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -3135,6 +3135,37 @@ public class Env {
                 sb.append(olapTable.skipWriteIndexOnLoad()).append("\"");
             }
 
+            // compaction policy
+            if (olapTable.getCompactionPolicy() != null && 
!olapTable.getCompactionPolicy().equals("")
+                    && 
!olapTable.getCompactionPolicy().equals(PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY))
 {
+                
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY).append("\"
 = \"");
+                sb.append(olapTable.getCompactionPolicy()).append("\"");
+            }
+
+            // time series compaction goal size
+            if (olapTable.getCompactionPolicy() != null && 
olapTable.getCompactionPolicy()
+                                                            
.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)) {
+                sb.append(",\n\"").append(PropertyAnalyzer
+                                    
.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES).append("\" = \"");
+                
sb.append(olapTable.getTimeSeriesCompactionGoalSizeMbytes()).append("\"");
+            }
+
+            // time series compaction file count threshold
+            if (olapTable.getCompactionPolicy() != null && 
olapTable.getCompactionPolicy()
+                                                            
.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)) {
+                sb.append(",\n\"").append(PropertyAnalyzer
+                                    
.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD).append("\" = \"");
+                
sb.append(olapTable.getTimeSeriesCompactionFileCountThreshold()).append("\"");
+            }
+
+            // time series compaction time threshold
+            if (olapTable.getCompactionPolicy() != null && 
olapTable.getCompactionPolicy()
+                                                            
.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)) {
+                sb.append(",\n\"").append(PropertyAnalyzer
+                                    
.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS).append("\" = \"");
+                
sb.append(olapTable.getTimeSeriesCompactionTimeThresholdSeconds()).append("\"");
+            }
+
             // dynamic schema
             if (olapTable.isDynamicSchema()) {
                 
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_DYNAMIC_SCHEMA).append("\"
 = \"");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index ff5e3a8570..a242fa9eb7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -1844,6 +1844,62 @@ public class OlapTable extends Table {
         return false;
     }
 
+    public void setCompactionPolicy(String compactionPolicy) {
+        TableProperty tableProperty = getOrCreatTableProperty();
+        
tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY,
 compactionPolicy);
+        tableProperty.buildCompactionPolicy();
+    }
+
+    public String getCompactionPolicy() {
+        if (tableProperty != null) {
+            return tableProperty.compactionPolicy();
+        }
+        return "";
+    }
+
+    public void setTimeSeriesCompactionGoalSizeMbytes(long 
timeSeriesCompactionGoalSizeMbytes) {
+        TableProperty tableProperty = getOrCreatTableProperty();
+        
tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES,
+                                                        
Long.valueOf(timeSeriesCompactionGoalSizeMbytes).toString());
+        tableProperty.buildTimeSeriesCompactionGoalSizeMbytes();
+    }
+
+    public Long getTimeSeriesCompactionGoalSizeMbytes() {
+        if (tableProperty != null) {
+            return tableProperty.timeSeriesCompactionGoalSizeMbytes();
+        }
+        return null;
+    }
+
+    public void setTimeSeriesCompactionFileCountThreshold(long 
timeSeriesCompactionFileCountThreshold) {
+        TableProperty tableProperty = getOrCreatTableProperty();
+        
tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD,
+                                                    
Long.valueOf(timeSeriesCompactionFileCountThreshold).toString());
+        tableProperty.buildTimeSeriesCompactionFileCountThreshold();
+    }
+
+    public Long getTimeSeriesCompactionFileCountThreshold() {
+        if (tableProperty != null) {
+            return tableProperty.timeSeriesCompactionFileCountThreshold();
+        }
+        return null;
+    }
+
+    public void setTimeSeriesCompactionTimeThresholdSeconds(long 
timeSeriesCompactionTimeThresholdSeconds) {
+        TableProperty tableProperty = getOrCreatTableProperty();
+        tableProperty.modifyTableProperties(PropertyAnalyzer
+                                                    
.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS,
+                                                    
Long.valueOf(timeSeriesCompactionTimeThresholdSeconds).toString());
+        tableProperty.buildTimeSeriesCompactionTimeThresholdSeconds();
+    }
+
+    public Long getTimeSeriesCompactionTimeThresholdSeconds() {
+        if (tableProperty != null) {
+            return tableProperty.timeSeriesCompactionTimeThresholdSeconds();
+        }
+        return null;
+    }
+
     public Boolean isDynamicSchema() {
         if (tableProperty != null) {
             return tableProperty.isDynamicSchema();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
index bced77425b..c8b6cdb53e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
@@ -86,6 +86,17 @@ public class TableProperty implements Writable {
 
     private boolean skipWriteIndexOnLoad = false;
 
+    private String compactionPolicy = 
PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY;
+
+    private long timeSeriesCompactionGoalSizeMbytes
+                                    = 
PropertyAnalyzer.TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE;
+
+    private long timeSeriesCompactionFileCountThreshold
+                                    = 
PropertyAnalyzer.TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE;
+
+    private long timeSeriesCompactionTimeThresholdSeconds
+                                    = 
PropertyAnalyzer.TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE;
+
     private DataSortInfo dataSortInfo = new DataSortInfo();
 
     public TableProperty(Map<String, String> properties) {
@@ -214,6 +225,51 @@ public class TableProperty implements Writable {
         return skipWriteIndexOnLoad;
     }
 
+    public TableProperty buildCompactionPolicy() {
+        compactionPolicy = 
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY,
+                                                                
PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY);
+        return this;
+    }
+
+    public String compactionPolicy() {
+        return compactionPolicy;
+    }
+
+    public TableProperty buildTimeSeriesCompactionGoalSizeMbytes() {
+        timeSeriesCompactionGoalSizeMbytes = Long.parseLong(properties
+                    
.getOrDefault(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES,
+                    
String.valueOf(PropertyAnalyzer.TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE)));
+        return this;
+    }
+
+    public long timeSeriesCompactionGoalSizeMbytes() {
+        return timeSeriesCompactionGoalSizeMbytes;
+    }
+
+    public TableProperty buildTimeSeriesCompactionFileCountThreshold() {
+        timeSeriesCompactionFileCountThreshold = Long.parseLong(properties
+                    
.getOrDefault(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD,
+                    
String.valueOf(PropertyAnalyzer.TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE)));
+
+        return this;
+    }
+
+    public long timeSeriesCompactionFileCountThreshold() {
+        return timeSeriesCompactionFileCountThreshold;
+    }
+
+    public TableProperty buildTimeSeriesCompactionTimeThresholdSeconds() {
+        timeSeriesCompactionTimeThresholdSeconds = Long.parseLong(properties
+                    
.getOrDefault(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS,
+                    
String.valueOf(PropertyAnalyzer.TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE)));
+
+        return this;
+    }
+
+    public long timeSeriesCompactionTimeThresholdSeconds() {
+        return timeSeriesCompactionTimeThresholdSeconds;
+    }
+
     public TableProperty buildStoragePolicy() {
         storagePolicy = 
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY, "");
         return this;
@@ -445,6 +501,10 @@ public class TableProperty implements Writable {
                 .buildEnableLightSchemaChange()
                 .buildStoreRowColumn()
                 .buildSkipWriteIndexOnLoad()
+                .buildCompactionPolicy()
+                .buildTimeSeriesCompactionGoalSizeMbytes()
+                .buildTimeSeriesCompactionFileCountThreshold()
+                .buildTimeSeriesCompactionTimeThresholdSeconds()
                 .buildDisableAutoCompaction()
                 .buildEnableSingleReplicaCompaction();
         if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_105) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index b80d511256..c3570ff590 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -127,6 +127,16 @@ public class PropertyAnalyzer {
 
     public static final String PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD = 
"skip_write_index_on_load";
 
+    public static final String PROPERTIES_COMPACTION_POLICY = 
"compaction_policy";
+
+    public static final String 
PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES =
+                                                        
"time_series_compaction_goal_size_mbytes";
+
+    public static final String 
PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD =
+                                                        
"time_series_compaction_file_count_threshold";
+
+    public static final String 
PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS =
+                                                        
"time_series_compaction_time_threshold_seconds";
     public static final String PROPERTIES_MUTABLE = "mutable";
 
     public static final String PROPERTIES_IS_BEING_SYNCED = "is_being_synced";
@@ -152,6 +162,16 @@ public class PropertyAnalyzer {
     private static final double MAX_FPP = 0.05;
     private static final double MIN_FPP = 0.0001;
 
+    // compaction policy
+    public static final String SIZE_BASED_COMPACTION_POLICY = "size_based";
+    public static final String TIME_SERIES_COMPACTION_POLICY = "time_series";
+    public static final long 
TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE = 1024;
+    public static final long 
TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE = 2000;
+    public static final long 
TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE = 3600;
+
+
+
+
     /**
      * check and replace members of DataProperty by properties.
      *
@@ -595,6 +615,94 @@ public class PropertyAnalyzer {
                 + " must be `true` or `false`");
     }
 
+    public static String analyzeCompactionPolicy(Map<String, String> 
properties) throws AnalysisException {
+        if (properties == null || properties.isEmpty()) {
+            return SIZE_BASED_COMPACTION_POLICY;
+        }
+        String compactionPolicy = SIZE_BASED_COMPACTION_POLICY;
+        if (properties.containsKey(PROPERTIES_COMPACTION_POLICY)) {
+            compactionPolicy = properties.get(PROPERTIES_COMPACTION_POLICY);
+            properties.remove(PROPERTIES_COMPACTION_POLICY);
+            if (compactionPolicy != null && 
!compactionPolicy.equals(TIME_SERIES_COMPACTION_POLICY)
+                                                && 
!compactionPolicy.equals(SIZE_BASED_COMPACTION_POLICY)) {
+                throw new AnalysisException(PROPERTIES_COMPACTION_POLICY
+                        + " must be " + TIME_SERIES_COMPACTION_POLICY + " or " 
+ SIZE_BASED_COMPACTION_POLICY);
+            }
+        }
+
+        return compactionPolicy;
+    }
+
+    public static long analyzeTimeSeriesCompactionGoalSizeMbytes(Map<String, 
String> properties)
+                                                                               
     throws AnalysisException {
+        long goalSizeMbytes = 
TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE;
+        if (properties == null || properties.isEmpty()) {
+            return goalSizeMbytes;
+        }
+        if 
(properties.containsKey(PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)) {
+            String goalSizeMbytesStr = 
properties.get(PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES);
+            
properties.remove(PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES);
+            try {
+                goalSizeMbytes = Long.parseLong(goalSizeMbytesStr);
+                if (goalSizeMbytes < 10) {
+                    throw new 
AnalysisException("time_series_compaction_goal_size_mbytes can not be"
+                                                                + " less than 
10: " + goalSizeMbytesStr);
+                }
+            } catch (NumberFormatException e) {
+                throw new AnalysisException("Invalid 
time_series_compaction_goal_size_mbytes format: "
+                        + goalSizeMbytesStr);
+            }
+        }
+        return goalSizeMbytes;
+    }
+
+    public static long 
analyzeTimeSeriesCompactionFileCountThreshold(Map<String, String> properties)
+                                                                               
     throws AnalysisException {
+        long fileCountThreshold = 
TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE;
+        if (properties == null || properties.isEmpty()) {
+            return fileCountThreshold;
+        }
+        if 
(properties.containsKey(PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD))
 {
+            String fileCountThresholdStr = properties
+                                            
.get(PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD);
+            
properties.remove(PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD);
+            try {
+                fileCountThreshold = Long.parseLong(fileCountThresholdStr);
+                if (fileCountThreshold < 10) {
+                    throw new 
AnalysisException("time_series_compaction_file_count_threshold can not be "
+                                                            + "less than 10: " 
+ fileCountThresholdStr);
+                }
+            } catch (NumberFormatException e) {
+                throw new AnalysisException("Invalid 
time_series_compaction_file_count_threshold format: "
+                                                                               
 + fileCountThresholdStr);
+            }
+        }
+        return fileCountThreshold;
+    }
+
+    public static long 
analyzeTimeSeriesCompactionTimeThresholdSeconds(Map<String, String> properties)
+                                                                               
         throws AnalysisException {
+        long timeThresholdSeconds = 
TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE;
+        if (properties == null || properties.isEmpty()) {
+            return timeThresholdSeconds;
+        }
+        if 
(properties.containsKey(PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS))
 {
+            String timeThresholdSecondsStr = 
properties.get(PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS);
+            
properties.remove(PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS);
+            try {
+                timeThresholdSeconds = Long.parseLong(timeThresholdSecondsStr);
+                if (timeThresholdSeconds < 60) {
+                    throw new 
AnalysisException("time_series_compaction_time_threshold_seconds can not be"
+                                                                + " less than 
60: " + timeThresholdSecondsStr);
+                }
+            } catch (NumberFormatException e) {
+                throw new AnalysisException("Invalid 
time_series_compaction_time_threshold_seconds format: "
+                                                                               
 + timeThresholdSecondsStr);
+            }
+        }
+        return timeThresholdSeconds;
+    }
+
     // analyzeCompressionType will parse the compression type from properties
     public static TCompressionType analyzeCompressionType(Map<String, String> 
properties) throws AnalysisException {
         String compressionType = "";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 6c1079688d..46c89df7ba 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -1434,6 +1434,21 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                 
properties.put(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD,
                         olapTable.skipWriteIndexOnLoad().toString());
             }
+            if 
(!properties.containsKey(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY)) {
+                properties.put(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY, 
olapTable.getCompactionPolicy());
+            }
+            if 
(!properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES))
 {
+                
properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES,
+                                                
olapTable.getTimeSeriesCompactionGoalSizeMbytes().toString());
+            }
+            if 
(!properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD))
 {
+                
properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD,
+                                                
olapTable.getTimeSeriesCompactionFileCountThreshold().toString());
+            }
+            if 
(!properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS))
 {
+                
properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS,
+                                                
olapTable.getTimeSeriesCompactionTimeThresholdSeconds().toString());
+            }
             if 
(!properties.containsKey(PropertyAnalyzer.PROPERTIES_DYNAMIC_SCHEMA)) {
                 properties.put(PropertyAnalyzer.PROPERTIES_DYNAMIC_SCHEMA,
                         olapTable.isDynamicSchema().toString());
@@ -1530,7 +1545,11 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                     singlePartitionDesc.getTabletType(), 
olapTable.getCompressionType(), olapTable.getDataSortInfo(),
                     olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy, 
idGeneratorBuffer,
                     olapTable.disableAutoCompaction(), 
olapTable.enableSingleReplicaCompaction(),
-                    olapTable.skipWriteIndexOnLoad(), 
olapTable.storeRowColumn(), olapTable.isDynamicSchema(),
+                    olapTable.skipWriteIndexOnLoad(), 
olapTable.getCompactionPolicy(),
+                    olapTable.getTimeSeriesCompactionGoalSizeMbytes(),
+                    olapTable.getTimeSeriesCompactionFileCountThreshold(),
+                    olapTable.getTimeSeriesCompactionTimeThresholdSeconds(),
+                    olapTable.storeRowColumn(), olapTable.isDynamicSchema(),
                     binlogConfig, dataProperty.isStorageMediumSpecified());
 
             // check again
@@ -1758,6 +1777,8 @@ public class InternalCatalog implements 
CatalogIf<Database> {
             DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite, 
String storagePolicy,
             IdGeneratorBuffer idGeneratorBuffer, boolean disableAutoCompaction,
             boolean enableSingleReplicaCompaction, boolean 
skipWriteIndexOnLoad,
+            String compactionPolicy, Long timeSeriesCompactionGoalSizeMbytes,
+            Long timeSeriesCompactionFileCountThreshold, Long 
timeSeriesCompactionTimeThresholdSeconds,
             boolean storeRowColumn, boolean isDynamicSchema, BinlogConfig 
binlogConfig,
             boolean isStorageMediumSpecified) throws DdlException {
         // create base index first.
@@ -1822,6 +1843,8 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                             storageMedium, schema, bfColumns, bfFpp, 
countDownLatch, indexes, isInMemory, tabletType,
                             dataSortInfo, compressionType, 
enableUniqueKeyMergeOnWrite, storagePolicy,
                             disableAutoCompaction, 
enableSingleReplicaCompaction, skipWriteIndexOnLoad,
+                            compactionPolicy, 
timeSeriesCompactionGoalSizeMbytes,
+                            timeSeriesCompactionFileCountThreshold, 
timeSeriesCompactionTimeThresholdSeconds,
                             storeRowColumn, isDynamicSchema, binlogConfig);
 
                     task.setStorageFormat(storageFormat);
@@ -1982,6 +2005,56 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         // use light schema change optimization
         olapTable.setDisableAutoCompaction(disableAutoCompaction);
 
+        // set compaction policy
+        String compactionPolicy = 
PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY;
+        try {
+            compactionPolicy = 
PropertyAnalyzer.analyzeCompactionPolicy(properties);
+        } catch (AnalysisException e) {
+            throw new DdlException(e.getMessage());
+        }
+        olapTable.setCompactionPolicy(compactionPolicy);
+
+        if 
(!compactionPolicy.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)
+                && 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)
+                || 
properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD)
+                || properties
+                        
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)))
 {
+            throw new DdlException("only time series compaction policy support 
for time series config");
+        }
+
+        // set time series compaction goal size
+        long timeSeriesCompactionGoalSizeMbytes
+                                    = 
PropertyAnalyzer.TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE;
+        try {
+            timeSeriesCompactionGoalSizeMbytes = PropertyAnalyzer
+                                        
.analyzeTimeSeriesCompactionGoalSizeMbytes(properties);
+        } catch (AnalysisException e) {
+            throw new DdlException(e.getMessage());
+        }
+        
olapTable.setTimeSeriesCompactionGoalSizeMbytes(timeSeriesCompactionGoalSizeMbytes);
+
+        // set time series compaction file count threshold
+        long timeSeriesCompactionFileCountThreshold
+                                    = 
PropertyAnalyzer.TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE;
+        try {
+            timeSeriesCompactionFileCountThreshold = PropertyAnalyzer
+                                    
.analyzeTimeSeriesCompactionFileCountThreshold(properties);
+        } catch (AnalysisException e) {
+            throw new DdlException(e.getMessage());
+        }
+        
olapTable.setTimeSeriesCompactionFileCountThreshold(timeSeriesCompactionFileCountThreshold);
+
+        // set time series compaction time threshold
+        long timeSeriesCompactionTimeThresholdSeconds
+                                     = 
PropertyAnalyzer.TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE;
+        try {
+            timeSeriesCompactionTimeThresholdSeconds = PropertyAnalyzer
+                                    
.analyzeTimeSeriesCompactionTimeThresholdSeconds(properties);
+        } catch (AnalysisException e) {
+            throw new DdlException(e.getMessage());
+        }
+        
olapTable.setTimeSeriesCompactionTimeThresholdSeconds(timeSeriesCompactionTimeThresholdSeconds);
+
         // get storage format
         TStorageFormat storageFormat = TStorageFormat.V2; // default is 
segment v2
         try {
@@ -2307,6 +2380,9 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                         olapTable.getDataSortInfo(), 
olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy,
                         idGeneratorBuffer, olapTable.disableAutoCompaction(),
                         olapTable.enableSingleReplicaCompaction(), 
skipWriteIndexOnLoad,
+                        olapTable.getCompactionPolicy(), 
olapTable.getTimeSeriesCompactionGoalSizeMbytes(),
+                        olapTable.getTimeSeriesCompactionFileCountThreshold(),
+                        
olapTable.getTimeSeriesCompactionTimeThresholdSeconds(),
                         storeRowColumn, isDynamicSchema, binlogConfigForTask,
                         
partitionInfo.getDataProperty(partitionId).isStorageMediumSpecified());
                 olapTable.addPartition(partition);
@@ -2373,8 +2449,12 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                             storageFormat, 
partitionInfo.getTabletType(entry.getValue()), compressionType,
                             olapTable.getDataSortInfo(), 
olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy,
                             idGeneratorBuffer, 
olapTable.disableAutoCompaction(),
-                            olapTable.enableSingleReplicaCompaction(), 
skipWriteIndexOnLoad, storeRowColumn,
-                            isDynamicSchema, binlogConfigForTask, 
dataProperty.isStorageMediumSpecified());
+                            olapTable.enableSingleReplicaCompaction(), 
skipWriteIndexOnLoad,
+                            olapTable.getCompactionPolicy(), 
olapTable.getTimeSeriesCompactionGoalSizeMbytes(),
+                            
olapTable.getTimeSeriesCompactionFileCountThreshold(),
+                            
olapTable.getTimeSeriesCompactionTimeThresholdSeconds(),
+                            storeRowColumn, isDynamicSchema, 
binlogConfigForTask,
+                            dataProperty.isStorageMediumSpecified());
                     olapTable.addPartition(partition);
                 }
             } else {
@@ -2797,6 +2877,9 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                         
olapTable.getPartitionInfo().getDataProperty(oldPartitionId).getStoragePolicy(),
                         idGeneratorBuffer, olapTable.disableAutoCompaction(),
                         olapTable.enableSingleReplicaCompaction(), 
olapTable.skipWriteIndexOnLoad(),
+                        olapTable.getCompactionPolicy(), 
olapTable.getTimeSeriesCompactionGoalSizeMbytes(),
+                        olapTable.getTimeSeriesCompactionFileCountThreshold(),
+                        
olapTable.getTimeSeriesCompactionTimeThresholdSeconds(),
                         olapTable.storeRowColumn(), 
olapTable.isDynamicSchema(), binlogConfig,
                         
copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId).isStorageMediumSpecified());
                 newPartitions.add(newPartition);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 17be05444e..055652a0b9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -809,7 +809,10 @@ public class ReportHandler extends Daemon {
                                             
olapTable.getEnableUniqueKeyMergeOnWrite(), olapTable.getStoragePolicy(),
                                             olapTable.disableAutoCompaction(),
                                             
olapTable.enableSingleReplicaCompaction(),
-                                            olapTable.skipWriteIndexOnLoad(),
+                                            olapTable.skipWriteIndexOnLoad(), 
olapTable.getCompactionPolicy(),
+                                            
olapTable.getTimeSeriesCompactionGoalSizeMbytes(),
+                                            
olapTable.getTimeSeriesCompactionFileCountThreshold(),
+                                            
olapTable.getTimeSeriesCompactionTimeThresholdSeconds(),
                                             olapTable.storeRowColumn(), 
olapTable.isDynamicSchema(),
                                             binlogConfig);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
index 49bd33fb5c..f654c16784 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
@@ -105,6 +105,14 @@ public class CreateReplicaTask extends AgentTask {
 
     private boolean skipWriteIndexOnLoad;
 
+    private String compactionPolicy;
+
+    private long timeSeriesCompactionGoalSizeMbytes;
+
+    private long timeSeriesCompactionFileCountThreshold;
+
+    private long timeSeriesCompactionTimeThresholdSeconds;
+
     private boolean storeRowColumn;
 
     private BinlogConfig binlogConfig;
@@ -123,6 +131,10 @@ public class CreateReplicaTask extends AgentTask {
                              String storagePolicy, boolean 
disableAutoCompaction,
                              boolean enableSingleReplicaCompaction,
                              boolean skipWriteIndexOnLoad,
+                             String compactionPolicy,
+                             long timeSeriesCompactionGoalSizeMbytes,
+                             long timeSeriesCompactionFileCountThreshold,
+                             long timeSeriesCompactionTimeThresholdSeconds,
                              boolean storeRowColumn,
                              boolean isDynamicSchema,
                              BinlogConfig binlogConfig) {
@@ -162,6 +174,10 @@ public class CreateReplicaTask extends AgentTask {
         this.disableAutoCompaction = disableAutoCompaction;
         this.enableSingleReplicaCompaction = enableSingleReplicaCompaction;
         this.skipWriteIndexOnLoad = skipWriteIndexOnLoad;
+        this.compactionPolicy = compactionPolicy;
+        this.timeSeriesCompactionGoalSizeMbytes = 
timeSeriesCompactionGoalSizeMbytes;
+        this.timeSeriesCompactionFileCountThreshold = 
timeSeriesCompactionFileCountThreshold;
+        this.timeSeriesCompactionTimeThresholdSeconds = 
timeSeriesCompactionTimeThresholdSeconds;
         this.storeRowColumn = storeRowColumn;
         this.binlogConfig = binlogConfig;
     }
@@ -270,7 +286,6 @@ public class CreateReplicaTask extends AgentTask {
         tSchema.setDisableAutoCompaction(disableAutoCompaction);
         
tSchema.setEnableSingleReplicaCompaction(enableSingleReplicaCompaction);
         tSchema.setSkipWriteIndexOnLoad(skipWriteIndexOnLoad);
-        tSchema.setSkipWriteIndexOnLoad(skipWriteIndexOnLoad);
         tSchema.setStoreRowColumn(storeRowColumn);
         tSchema.setIsDynamicSchema(isDynamicSchema);
         createTabletReq.setTabletSchema(tSchema);
@@ -300,6 +315,10 @@ public class CreateReplicaTask extends AgentTask {
         createTabletReq.setTabletType(tabletType);
         createTabletReq.setCompressionType(compressionType);
         
createTabletReq.setEnableUniqueKeyMergeOnWrite(enableUniqueKeyMergeOnWrite);
+        createTabletReq.setCompactionPolicy(compactionPolicy);
+        
createTabletReq.setTimeSeriesCompactionGoalSizeMbytes(timeSeriesCompactionGoalSizeMbytes);
+        
createTabletReq.setTimeSeriesCompactionFileCountThreshold(timeSeriesCompactionFileCountThreshold);
+        
createTabletReq.setTimeSeriesCompactionTimeThresholdSeconds(timeSeriesCompactionTimeThresholdSeconds);
 
         if (binlogConfig != null) {
             createTabletReq.setBinlogConfig(binlogConfig.toThrift());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java
index 29ad3ce199..6b12c34e0d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.BinlogConfig;
 import org.apache.doris.common.MarkedCountDownLatch;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.Status;
+import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TTabletMetaInfo;
 import org.apache.doris.thrift.TTaskType;
@@ -30,6 +31,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 
@@ -44,6 +46,8 @@ public class UpdateTabletMetaInfoTask extends AgentTask {
     private int inMemory = -1; // < 0 means not to update inMemory property, > 
0 means true, == 0 means false
     private long storagePolicyId = -1; // < 0 means not to update storage 
policy, == 0 means to reset storage policy
     private BinlogConfig binlogConfig = null; // null means not to update 
binlog config
+    private String compactionPolicy = null; // null means not to update 
compaction policy
+    private Map<String, Long> timeSeriesCompactionConfig = null; // null means 
not to update compaction policy config
     // For ReportHandler
     private List<TTabletMetaInfo> tabletMetaInfos;
 
@@ -72,6 +76,22 @@ public class UpdateTabletMetaInfoTask extends AgentTask {
         this.tabletMetaInfos = tabletMetaInfos;
     }
 
+    public UpdateTabletMetaInfoTask(long backendId,
+                                    Set<Pair<Long, Integer>> 
tableIdWithSchemaHash,
+                                    int inMemory, long storagePolicyId,
+                                    BinlogConfig binlogConfig,
+                                    MarkedCountDownLatch<Long, Set<Pair<Long, 
Integer>>> latch,
+                                    String compactionPolicy,
+                                    Map<String, Long> 
timeSeriesCompactionConfig) {
+        this(backendId, tableIdWithSchemaHash);
+        this.storagePolicyId = storagePolicyId;
+        this.inMemory = inMemory;
+        this.binlogConfig = binlogConfig;
+        this.latch = latch;
+        this.compactionPolicy = compactionPolicy;
+        this.timeSeriesCompactionConfig = timeSeriesCompactionConfig;
+    }
+
     public void countDownLatch(long backendId, Set<Pair<Long, Integer>> 
tablets) {
         if (this.latch != null) {
             if (latch.markedCountDown(backendId, tablets)) {
@@ -110,6 +130,26 @@ public class UpdateTabletMetaInfoTask extends AgentTask {
                 if (binlogConfig != null) {
                     metaInfo.setBinlogConfig(binlogConfig.toThrift());
                 }
+                if (compactionPolicy != null) {
+                    metaInfo.setCompactionPolicy(compactionPolicy);
+                }
+                if (timeSeriesCompactionConfig != null && 
!timeSeriesCompactionConfig.isEmpty()) {
+                    if (timeSeriesCompactionConfig
+                            
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES))
 {
+                        
metaInfo.setTimeSeriesCompactionGoalSizeMbytes(timeSeriesCompactionConfig
+                                    
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES));
+                    }
+                    if (timeSeriesCompactionConfig
+                            
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD))
 {
+                        
metaInfo.setTimeSeriesCompactionFileCountThreshold(timeSeriesCompactionConfig
+                                        
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD));
+                    }
+                    if (timeSeriesCompactionConfig
+                            
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS))
 {
+                        
metaInfo.setTimeSeriesCompactionTimeThresholdSeconds(timeSeriesCompactionConfig
+                                    
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS));
+                    }
+                }
                 updateTabletMetaInfoReq.addToTabletMetaInfos(metaInfo);
             }
         } else {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
index cf19d7d918..86d3fda79f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
@@ -107,7 +107,7 @@ public class AgentTaskTest {
         createReplicaTask = new CreateReplicaTask(backendId1, dbId, tableId, 
partitionId,
                 indexId1, tabletId1, replicaId1, shortKeyNum, schemaHash1, 
version, KeysType.AGG_KEYS, storageType,
                 TStorageMedium.SSD, columns, null, 0, latch, null, false, 
TTabletType.TABLET_TYPE_DISK, null,
-                TCompressionType.LZ4F, false, "", false, false, false, false, 
false, null);
+                TCompressionType.LZ4F, false, "", false, false, false, "", 0, 
0, 0, false, false, null);
 
         // drop
         dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1, 
schemaHash1, false);
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index fe894dba9d..7c0c83a4a0 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -311,6 +311,10 @@ message TabletMetaPB {
     optional int64 storage_policy_id = 25;
     optional PUniqueId cooldown_meta_id = 26;
     optional BinlogConfigPB binlog_config = 27;
+    optional string compaction_policy = 28 [default = "size_based"];
+    optional int64 time_series_compaction_goal_size_mbytes = 29 [default = 
1024];
+    optional int64 time_series_compaction_file_count_threshold = 30 [default = 
2000];
+    optional int64 time_series_compaction_time_threshold_seconds = 31 [default 
= 3600];
 }
 
 message OLAPRawDeltaHeaderMessage {
diff --git a/gensrc/thrift/AgentService.thrift 
b/gensrc/thrift/AgentService.thrift
index f92f873fc4..e5a94cbaba 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -141,6 +141,10 @@ struct TCreateTabletReq {
     19: optional bool enable_unique_key_merge_on_write = false
     20: optional i64 storage_policy_id
     21: optional TBinlogConfig binlog_config
+    22: optional string compaction_policy = "size_based"
+    23: optional i64 time_series_compaction_goal_size_mbytes = 1024
+    24: optional i64 time_series_compaction_file_count_threshold = 2000
+    25: optional i64 time_series_compaction_time_threshold_seconds = 3600
 }
 
 struct TDropTabletReq {
@@ -401,6 +405,10 @@ struct TTabletMetaInfo {
     7: optional i64 storage_policy_id
     8: optional Types.TReplicaId replica_id
     9: optional TBinlogConfig binlog_config
+    10: optional string compaction_policy
+    11: optional i64 time_series_compaction_goal_size_mbytes
+    12: optional i64 time_series_compaction_file_count_threshold
+    13: optional i64 time_series_compaction_time_threshold_seconds
 }
 
 struct TUpdateTabletMetaInfoReq {
diff --git 
a/regression-test/suites/compaction/test_table_level_compaction_policy.groovy 
b/regression-test/suites/compaction/test_table_level_compaction_policy.groovy
new file mode 100644
index 0000000000..838d8b34dc
--- /dev/null
+++ 
b/regression-test/suites/compaction/test_table_level_compaction_policy.groovy
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_table_level_compaction_policy") {
+    def tableName = "test_table_level_compaction_policy"
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+
+    sql """
+            CREATE TABLE ${tableName} (
+                    `c_custkey` int(11) NOT NULL COMMENT "",
+                    `c_name` varchar(26) NOT NULL COMMENT "",
+                    `c_address` varchar(41) NOT NULL COMMENT "",
+                    `c_city` varchar(11) NOT NULL COMMENT ""
+            )
+            DUPLICATE KEY (`c_custkey`)
+            DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 1
+            PROPERTIES (
+                    "replication_num" = "1",
+                    "compaction_policy" = "time_series",
+                    "time_series_compaction_goal_size_mbytes" = "2048", 
+                    "time_series_compaction_file_count_threshold" = "5000",
+                    "time_series_compaction_time_threshold_seconds" = "86400"
+             );
+        """
+    result = sql """show create table ${tableName}"""
+    logger.info("${result}")
+    assertTrue(result.toString().containsIgnoreCase('"compaction_policy" = 
"time_series"'))
+    
assertTrue(result.toString().containsIgnoreCase('"time_series_compaction_goal_size_mbytes"
 = "2048"'))
+    
assertTrue(result.toString().containsIgnoreCase('"time_series_compaction_file_count_threshold"
 = "5000"'))
+    
assertTrue(result.toString().containsIgnoreCase('"time_series_compaction_time_threshold_seconds"
 = "86400"'))
+
+    sql """
+        alter table ${tableName} set 
("time_series_compaction_goal_size_mbytes" = "1024")
+        """
+
+    result = sql """show create table ${tableName}"""
+    logger.info("${result}")
+    
assertTrue(result.toString().containsIgnoreCase('"time_series_compaction_goal_size_mbytes"
 = "1024"'))
+
+    sql """
+        alter table ${tableName} set 
("time_series_compaction_file_count_threshold" = "6000")
+        """
+
+    result = sql """show create table ${tableName}"""
+    logger.info("${result}")
+    
assertTrue(result.toString().containsIgnoreCase('"time_series_compaction_file_count_threshold"
 = "6000"'))
+
+     sql """
+        alter table ${tableName} set 
("time_series_compaction_time_threshold_seconds" = "3000")
+        """
+
+    result = sql """show create table ${tableName}"""
+    logger.info("${result}")
+    
assertTrue(result.toString().containsIgnoreCase('"time_series_compaction_time_threshold_seconds"
 = "3000"'))
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+
+    sql """
+            CREATE TABLE ${tableName} (
+                    `c_custkey` int(11) NOT NULL COMMENT "",
+                    `c_name` varchar(26) NOT NULL COMMENT "",
+                    `c_address` varchar(41) NOT NULL COMMENT "",
+                    `c_city` varchar(11) NOT NULL COMMENT ""
+            )
+            DUPLICATE KEY (`c_custkey`)
+            DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 1
+            PROPERTIES (
+                    "replication_num" = "1"
+             );
+        """
+    result = sql """show create table ${tableName}"""
+    logger.info("${result}")
+    assertFalse(result.toString().containsIgnoreCase('"compaction_policy"'))
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+
+    test {
+        sql """
+            CREATE TABLE ${tableName} (
+                    `c_custkey` int(11) NOT NULL COMMENT "",
+                    `c_name` varchar(26) NOT NULL COMMENT "",
+                    `c_address` varchar(41) NOT NULL COMMENT "",
+                    `c_city` varchar(11) NOT NULL COMMENT ""
+            )
+            DUPLICATE KEY (`c_custkey`)
+            DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 1
+            PROPERTIES (
+                    "replication_num" = "1",
+                    "compaction_policy" = "time_series",
+                    "time_series_compaction_goal_size_mbytes" = "5"
+             );
+        """
+        exception "time_series_compaction_goal_size_mbytes can not be less 
than 10: 5"
+    }
+
+    test {
+        sql """
+            CREATE TABLE ${tableName} (
+                    `c_custkey` int(11) NOT NULL COMMENT "",
+                    `c_name` varchar(26) NOT NULL COMMENT "",
+                    `c_address` varchar(41) NOT NULL COMMENT "",
+                    `c_city` varchar(11) NOT NULL COMMENT ""
+            )
+            DUPLICATE KEY (`c_custkey`)
+            DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 1
+            PROPERTIES (
+                    "replication_num" = "1",
+                    "compaction_policy" = "time_series",
+                    "time_series_compaction_file_count_threshold" = "5"
+             );
+        """
+        exception "time_series_compaction_file_count_threshold can not be less 
than 10: 5"
+    }
+
+    test {
+        sql """
+            CREATE TABLE ${tableName} (
+                    `c_custkey` int(11) NOT NULL COMMENT "",
+                    `c_name` varchar(26) NOT NULL COMMENT "",
+                    `c_address` varchar(41) NOT NULL COMMENT "",
+                    `c_city` varchar(11) NOT NULL COMMENT ""
+            )
+            DUPLICATE KEY (`c_custkey`)
+            DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 1
+            PROPERTIES (
+                    "replication_num" = "1",
+                    "compaction_policy" = "time_series",
+                    "time_series_compaction_time_threshold_seconds" = "5"
+             );
+        """
+        exception "time_series_compaction_time_threshold_seconds can not be 
less than 60: 5"
+    }
+
+    test {
+        sql """
+            CREATE TABLE ${tableName} (
+                    `c_custkey` int(11) NOT NULL COMMENT "",
+                    `c_name` varchar(26) NOT NULL COMMENT "",
+                    `c_address` varchar(41) NOT NULL COMMENT "",
+                    `c_city` varchar(11) NOT NULL COMMENT ""
+            )
+            DUPLICATE KEY (`c_custkey`)
+            DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 1
+            PROPERTIES (
+                    "replication_num" = "1",
+                    "compaction_policy" = "ok"
+             );
+        """
+        exception "compaction_policy must be time_series or size_based"
+    }
+
+    test {
+        sql """
+            CREATE TABLE ${tableName} (
+                    `c_custkey` int(11) NOT NULL COMMENT "",
+                    `c_name` varchar(26) NOT NULL COMMENT "",
+                    `c_address` varchar(41) NOT NULL COMMENT "",
+                    `c_city` varchar(11) NOT NULL COMMENT ""
+            )
+            DUPLICATE KEY (`c_custkey`)
+            DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 1
+            PROPERTIES (
+                    "replication_num" = "1",
+                    "time_series_compaction_goal_size_mbytes" = "2048"
+             );
+        """
+        exception "only time series compaction policy support for time series 
config"
+    }
+
+    test {
+        sql """
+            CREATE TABLE ${tableName} (
+                    `c_custkey` int(11) NOT NULL COMMENT "",
+                    `c_name` varchar(26) NOT NULL COMMENT "",
+                    `c_address` varchar(41) NOT NULL COMMENT "",
+                    `c_city` varchar(11) NOT NULL COMMENT ""
+            )
+            DUPLICATE KEY (`c_custkey`)
+            DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 1
+            PROPERTIES (
+                    "replication_num" = "1"
+             );
+        """
+
+        sql """
+            alter table  ${tableName} set ("compaction_policy" = "ok")
+            """
+        exception "Table compaction policy only support for time_series or 
size_based"
+    }
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to