This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c35a415fbdc [opt](invert index) add level to time series compaction
policy (#31488)
c35a415fbdc is described below
commit c35a415fbdcc1cf815c47c6082bf0f459f281c91
Author: zzzxl <[email protected]>
AuthorDate: Thu Mar 7 11:13:09 2024 +0800
[opt](invert index) add level to time series compaction policy (#31488)
---
be/src/agent/task_worker_pool.cpp | 23 +++--
be/src/cloud/pb_convert.cpp | 4 +
be/src/olap/cumulative_compaction.cpp | 3 +
be/src/olap/cumulative_compaction_policy.h | 8 ++
.../cumulative_compaction_time_series_policy.cpp | 108 +++++++++++++++++++--
.../cumulative_compaction_time_series_policy.h | 4 +
be/src/olap/full_compaction.cpp | 3 +
be/src/olap/rowset/rowset_meta.h | 6 ++
be/src/olap/tablet_meta.cpp | 17 +++-
be/src/olap/tablet_meta.h | 10 +-
.../Create/CREATE-TABLE.md | 9 ++
.../Create/CREATE-TABLE.md | 8 ++
.../main/java/org/apache/doris/alter/Alter.java | 4 +-
.../java/org/apache/doris/alter/RollupJobV2.java | 1 +
.../apache/doris/alter/SchemaChangeHandler.java | 7 ++
.../org/apache/doris/alter/SchemaChangeJobV2.java | 1 +
.../analysis/ModifyTablePropertiesClause.java | 17 ++++
.../java/org/apache/doris/backup/RestoreJob.java | 1 +
.../main/java/org/apache/doris/catalog/Env.java | 11 ++-
.../java/org/apache/doris/catalog/OlapTable.java | 14 +++
.../org/apache/doris/catalog/TableProperty.java | 18 +++-
.../apache/doris/common/util/PropertyAnalyzer.java | 28 ++++++
.../apache/doris/datasource/InternalCatalog.java | 20 +++-
.../org/apache/doris/master/ReportHandler.java | 1 +
.../org/apache/doris/task/CreateReplicaTask.java | 5 +
.../doris/task/UpdateTabletMetaInfoTask.java | 5 +
.../java/org/apache/doris/task/AgentTaskTest.java | 2 +-
gensrc/proto/olap_file.proto | 3 +
gensrc/thrift/AgentService.thrift | 2 +
29 files changed, 320 insertions(+), 23 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index af4eade1b3c..2d01d54fa98 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -55,6 +55,7 @@
#include "io/fs/local_file_system.h"
#include "io/fs/path.h"
#include "io/fs/s3_file_system.h"
+#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/data_dir.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset_meta.h"
@@ -762,8 +763,8 @@ void update_tablet_meta_callback(StorageEngine& engine,
const TAgentTaskRequest&
need_to_save = true;
}
if (tablet_meta_info.__isset.compaction_policy) {
- if (tablet_meta_info.compaction_policy != "size_based" &&
- tablet_meta_info.compaction_policy != "time_series") {
+ if (tablet_meta_info.compaction_policy !=
CUMULATIVE_SIZE_BASED_POLICY &&
+ tablet_meta_info.compaction_policy !=
CUMULATIVE_TIME_SERIES_POLICY) {
status = Status::InvalidArgument(
"invalid compaction policy, only support for
size_based or "
"time_series");
@@ -773,7 +774,7 @@ void update_tablet_meta_callback(StorageEngine& engine,
const TAgentTaskRequest&
need_to_save = true;
}
if (tablet_meta_info.__isset.time_series_compaction_goal_size_mbytes) {
- if (tablet->tablet_meta()->compaction_policy() != "time_series") {
+ if (tablet->tablet_meta()->compaction_policy() !=
CUMULATIVE_TIME_SERIES_POLICY) {
status = Status::InvalidArgument(
"only time series compaction policy support time
series config");
continue;
@@ -783,7 +784,7 @@ void update_tablet_meta_callback(StorageEngine& engine,
const TAgentTaskRequest&
need_to_save = true;
}
if
(tablet_meta_info.__isset.time_series_compaction_file_count_threshold) {
- if (tablet->tablet_meta()->compaction_policy() != "time_series") {
+ if (tablet->tablet_meta()->compaction_policy() !=
CUMULATIVE_TIME_SERIES_POLICY) {
status = Status::InvalidArgument(
"only time series compaction policy support time
series config");
continue;
@@ -793,7 +794,7 @@ void update_tablet_meta_callback(StorageEngine& engine,
const TAgentTaskRequest&
need_to_save = true;
}
if
(tablet_meta_info.__isset.time_series_compaction_time_threshold_seconds) {
- if (tablet->tablet_meta()->compaction_policy() != "time_series") {
+ if (tablet->tablet_meta()->compaction_policy() !=
CUMULATIVE_TIME_SERIES_POLICY) {
status = Status::InvalidArgument(
"only time series compaction policy support time
series config");
continue;
@@ -803,7 +804,7 @@ void update_tablet_meta_callback(StorageEngine& engine,
const TAgentTaskRequest&
need_to_save = true;
}
if
(tablet_meta_info.__isset.time_series_compaction_empty_rowsets_threshold) {
- if (tablet->tablet_meta()->compaction_policy() != "time_series") {
+ if (tablet->tablet_meta()->compaction_policy() !=
CUMULATIVE_TIME_SERIES_POLICY) {
status = Status::InvalidArgument(
"only time series compaction policy support time
series config");
continue;
@@ -812,6 +813,16 @@ void update_tablet_meta_callback(StorageEngine& engine,
const TAgentTaskRequest&
tablet_meta_info.time_series_compaction_empty_rowsets_threshold);
need_to_save = true;
}
+ if (tablet_meta_info.__isset.time_series_compaction_level_threshold) {
+ if (tablet->tablet_meta()->compaction_policy() !=
CUMULATIVE_TIME_SERIES_POLICY) {
+ status = Status::InvalidArgument(
+ "only time series compaction policy support time
series config");
+ continue;
+ }
+ tablet->tablet_meta()->set_time_series_compaction_level_threshold(
+ tablet_meta_info.time_series_compaction_level_threshold);
+ need_to_save = true;
+ }
if (tablet_meta_info.__isset.replica_id) {
tablet->tablet_meta()->set_replica_id(tablet_meta_info.replica_id);
}
diff --git a/be/src/cloud/pb_convert.cpp b/be/src/cloud/pb_convert.cpp
index 3fdc3678a55..2e064ded817 100644
--- a/be/src/cloud/pb_convert.cpp
+++ b/be/src/cloud/pb_convert.cpp
@@ -427,6 +427,7 @@ void doris_tablet_meta_to_cloud(TabletMetaCloudPB* out,
const TabletMetaPB& in)
in.time_series_compaction_time_threshold_seconds());
out->set_time_series_compaction_empty_rowsets_threshold(
in.time_series_compaction_empty_rowsets_threshold());
+
out->set_time_series_compaction_level_threshold(in.time_series_compaction_level_threshold());
out->set_index_id(in.index_id());
out->set_is_in_memory(in.is_in_memory());
out->set_is_persistent(in.is_persistent());
@@ -492,6 +493,7 @@ void doris_tablet_meta_to_cloud(TabletMetaCloudPB* out,
TabletMetaPB&& in) {
in.time_series_compaction_time_threshold_seconds());
out->set_time_series_compaction_empty_rowsets_threshold(
in.time_series_compaction_empty_rowsets_threshold());
+
out->set_time_series_compaction_level_threshold(in.time_series_compaction_level_threshold());
out->set_index_id(in.index_id());
out->set_is_in_memory(in.is_in_memory());
out->set_is_persistent(in.is_persistent());
@@ -566,6 +568,7 @@ void cloud_tablet_meta_to_doris(TabletMetaPB* out, const
TabletMetaCloudPB& in)
in.time_series_compaction_time_threshold_seconds());
out->set_time_series_compaction_empty_rowsets_threshold(
in.time_series_compaction_empty_rowsets_threshold());
+
out->set_time_series_compaction_level_threshold(in.time_series_compaction_level_threshold());
out->set_index_id(in.index_id());
out->set_is_in_memory(in.is_in_memory());
out->set_is_persistent(in.is_persistent());
@@ -631,6 +634,7 @@ void cloud_tablet_meta_to_doris(TabletMetaPB* out,
TabletMetaCloudPB&& in) {
in.time_series_compaction_time_threshold_seconds());
out->set_time_series_compaction_empty_rowsets_threshold(
in.time_series_compaction_empty_rowsets_threshold());
+
out->set_time_series_compaction_level_threshold(in.time_series_compaction_level_threshold());
out->set_index_id(in.index_id());
out->set_is_in_memory(in.is_in_memory());
out->set_is_persistent(in.is_persistent());
diff --git a/be/src/olap/cumulative_compaction.cpp
b/be/src/olap/cumulative_compaction.cpp
index 9de5837b935..1e0f338da23 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -101,6 +101,9 @@ Status CumulativeCompaction::execute_compact() {
RETURN_IF_ERROR(CompactionMixin::execute_compact());
DCHECK_EQ(_state, CompactionState::SUCCESS);
+
tablet()->cumulative_compaction_policy()->update_compaction_level(tablet(),
_input_rowsets,
+
_output_rowset);
+
tablet()->cumulative_compaction_policy()->update_cumulative_point(
tablet(), _input_rowsets, _output_rowset, _last_delete_version);
VLOG_CRITICAL << "after cumulative compaction, current cumulative point is
"
diff --git a/be/src/olap/cumulative_compaction_policy.h
b/be/src/olap/cumulative_compaction_policy.h
index b9b3bb46c0f..e2e5ca5460d 100644
--- a/be/src/olap/cumulative_compaction_policy.h
+++ b/be/src/olap/cumulative_compaction_policy.h
@@ -96,6 +96,11 @@ public:
int64_t current_cumulative_point,
int64_t* cumulative_point) = 0;
+ // Updates the compaction level of a tablet after a compaction operation.
+ virtual void update_compaction_level(Tablet* tablet,
+ const std::vector<RowsetSharedPtr>&
input_rowsets,
+ RowsetSharedPtr output_rowset) = 0;
+
/// Fetch cumulative policy name
virtual std::string_view name() = 0;
};
@@ -149,6 +154,9 @@ public:
/// Its main policy is calculating the accumulative compaction score after
current cumulative_point in tablet.
uint32_t calc_cumulative_compaction_score(Tablet* tablet) override;
+ void update_compaction_level(Tablet* tablet, const
std::vector<RowsetSharedPtr>& input_rowsets,
+ RowsetSharedPtr output_rowset) override {}
+
std::string_view name() override { return CUMULATIVE_SIZE_BASED_POLICY; }
private:
diff --git a/be/src/olap/cumulative_compaction_time_series_policy.cpp
b/be/src/olap/cumulative_compaction_time_series_policy.cpp
index 6f7cb8e2e35..6c3f949723a 100644
--- a/be/src/olap/cumulative_compaction_time_series_policy.cpp
+++ b/be/src/olap/cumulative_compaction_time_series_policy.cpp
@@ -17,19 +17,24 @@
#include "olap/cumulative_compaction_time_series_policy.h"
+#include <algorithm>
+
#include "common/logging.h"
#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
#include "util/time.h"
namespace doris {
uint32_t
TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet*
tablet) {
uint32_t score = 0;
+ uint32_t level0_score = 0;
bool base_rowset_exist = false;
const int64_t point = tablet->cumulative_layer_point();
- int64_t total_size = 0;
+ int64_t level0_total_size = 0;
RowsetMetaSharedPtr first_meta;
int64_t first_version = INT64_MAX;
+ std::list<RowsetMetaSharedPtr> checked_rs_metas;
// NOTE: tablet._meta_lock is hold
auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
// check the base rowset and collect the rowsets of cumulative part
@@ -47,8 +52,13 @@ uint32_t
TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
continue;
} else {
// collect the rowsets of cumulative part
- total_size += rs_meta->total_disk_size();
score += rs_meta->get_compaction_score();
+ if (rs_meta->compaction_level() == 0) {
+ level0_total_size += rs_meta->total_disk_size();
+ level0_score += rs_meta->get_compaction_score();
+ } else {
+ checked_rs_metas.push_back(rs_meta);
+ }
}
}
@@ -73,21 +83,39 @@ uint32_t
TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
// Condition 1: the size of input files for compaction meets the
requirement of parameter compaction_goal_size
int64_t compaction_goal_size_mbytes =
tablet->tablet_meta()->time_series_compaction_goal_size_mbytes();
- if (total_size >= compaction_goal_size_mbytes * 1024 * 1024) {
+ if (level0_total_size >= compaction_goal_size_mbytes * 1024 * 1024) {
return score;
}
// Condition 2: the number of input files reaches the threshold specified
by parameter compaction_file_count_threshold
- if (score >=
tablet->tablet_meta()->time_series_compaction_file_count_threshold()) {
+ if (level0_score >=
tablet->tablet_meta()->time_series_compaction_file_count_threshold()) {
return score;
}
+ // Condition 3: level1 achieve compaction_goal_size
+ if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
+ checked_rs_metas.sort([](const RowsetMetaSharedPtr& a, const
RowsetMetaSharedPtr& b) {
+ return a->version().first < b->version().first;
+ });
+ int32_t rs_meta_count = 0;
+ int64_t continuous_size = 0;
+ for (const auto& rs_meta : checked_rs_metas) {
+ rs_meta_count++;
+ continuous_size += rs_meta->total_disk_size();
+ if (rs_meta_count >= 2) {
+ if (continuous_size >= compaction_goal_size_mbytes * 1024 *
1024) {
+ return score;
+ }
+ }
+ }
+ }
+
int64_t now = UnixMillis();
int64_t last_cumu = tablet->last_cumu_compaction_success_time();
if (last_cumu != 0) {
int64_t cumu_interval = now - last_cumu;
- // Condition 3: the time interval between compactions exceeds the
value specified by parameter _compaction_time_threshold_second
+ // Condition 4: the time interval between compactions exceeds the
value specified by parameter _compaction_time_threshold_second
if (cumu_interval >
(tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() *
1000)) {
return score;
@@ -163,6 +191,13 @@ void
TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(
break;
}
+ // upgrade: [0 0 2 1 1 0 0]
+ if (!is_delete &&
tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2 &&
+ rs->compaction_level() == 1) {
+ *ret_cumulative_point = rs->version().first;
+ break;
+ }
+
// include one situation: When the segment is not deleted, and is
singleton delta, and is NONOVERLAPPING, ret_cumulative_point increase
prev_version = rs->version().second;
*ret_cumulative_point = prev_version + 1;
@@ -193,6 +228,9 @@ int
TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
return 0;
}
+ int64_t compaction_goal_size_mbytes =
+ tablet->tablet_meta()->time_series_compaction_goal_size_mbytes();
+
int transient_size = 0;
*compaction_score = 0;
input_rowsets->clear();
@@ -231,8 +269,7 @@ int
TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
input_rowsets->push_back(rowset);
// Condition 1: the size of input files for compaction meets the
requirement of parameter compaction_goal_size
- if (total_size >=
- (tablet->tablet_meta()->time_series_compaction_goal_size_mbytes()
* 1024 * 1024)) {
+ if (total_size >= (compaction_goal_size_mbytes * 1024 * 1024)) {
if (input_rowsets->size() == 1 &&
!input_rowsets->front()->rowset_meta()->is_segments_overlapping()) {
// Only 1 non-overlapping rowset, skip it
@@ -262,20 +299,47 @@ int
TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
return transient_size;
}
+ // Condition 3: level1 achieve compaction_goal_size
+ std::vector<RowsetSharedPtr> level1_rowsets;
+ if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
+ int64_t continuous_size = 0;
+ for (const auto& rowset : candidate_rowsets) {
+ const auto& rs_meta = rowset->rowset_meta();
+ if (rs_meta->compaction_level() == 0) {
+ break;
+ }
+ level1_rowsets.push_back(rowset);
+ continuous_size += rs_meta->total_disk_size();
+ if (level1_rowsets.size() >= 2) {
+ if (continuous_size >= compaction_goal_size_mbytes * 1024 *
1024) {
+ input_rowsets->swap(level1_rowsets);
+ return input_rowsets->size();
+ }
+ }
+ }
+ }
+
int64_t now = UnixMillis();
int64_t last_cumu = tablet->last_cumu_compaction_success_time();
if (last_cumu != 0) {
int64_t cumu_interval = now - last_cumu;
- // Condition 3: the time interval between compactions exceeds the
value specified by parameter compaction_time_threshold_second
+ // Condition 4: the time interval between compactions exceeds the
value specified by parameter compaction_time_threshold_second
if (cumu_interval >
(tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() *
1000)) {
+ if
(tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
+ if (input_rowsets->empty() && level1_rowsets.size() >= 2) {
+ input_rowsets->swap(level1_rowsets);
+ return input_rowsets->size();
+ }
+ }
return transient_size;
}
}
input_rowsets->clear();
*compaction_score = 0;
+
return 0;
}
@@ -288,7 +352,35 @@ void
TimeSeriesCumulativeCompactionPolicy::update_cumulative_point(
return;
}
+ if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2 &&
+ output_rowset->rowset_meta()->compaction_level() < 2) {
+ return;
+ }
+
tablet->set_cumulative_layer_point(output_rowset->end_version() + 1);
}
+void TimeSeriesCumulativeCompactionPolicy::update_compaction_level(
+ Tablet* tablet, const std::vector<RowsetSharedPtr>& input_rowsets,
+ RowsetSharedPtr output_rowset) {
+ if (tablet->tablet_state() != TABLET_RUNNING ||
output_rowset->num_segments() == 0) {
+ return;
+ }
+
+ int64_t first_level = 0;
+ for (size_t i = 0; i < input_rowsets.size(); i++) {
+ int64_t cur_level =
input_rowsets[i]->rowset_meta()->compaction_level();
+ if (i == 0) {
+ first_level = cur_level;
+ } else {
+ if (first_level != cur_level) {
+ LOG(ERROR) << "Failed to check compaction level, first_level:
" << first_level
+ << ", cur_level: " << cur_level;
+ }
+ }
+ }
+
+ output_rowset->rowset_meta()->set_compaction_level(first_level + 1);
+}
+
} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/cumulative_compaction_time_series_policy.h
b/be/src/olap/cumulative_compaction_time_series_policy.h
index 015dce055e9..4c134202e1d 100644
--- a/be/src/olap/cumulative_compaction_time_series_policy.h
+++ b/be/src/olap/cumulative_compaction_time_series_policy.h
@@ -59,6 +59,10 @@ public:
void update_cumulative_point(Tablet* tablet, const
std::vector<RowsetSharedPtr>& input_rowsets,
RowsetSharedPtr _output_rowset,
Version& last_delete_version) override;
+
+ void update_compaction_level(Tablet* tablet, const
std::vector<RowsetSharedPtr>& input_rowsets,
+ RowsetSharedPtr output_rowset) override;
+
std::string_view name() override { return CUMULATIVE_TIME_SERIES_POLICY; }
};
diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp
index b247d6c0ea1..0d6660ca543 100644
--- a/be/src/olap/full_compaction.cpp
+++ b/be/src/olap/full_compaction.cpp
@@ -70,6 +70,9 @@ Status FullCompaction::execute_compact() {
RETURN_IF_ERROR(CompactionMixin::execute_compact());
+
tablet()->cumulative_compaction_policy()->update_compaction_level(tablet(),
_input_rowsets,
+
_output_rowset);
+
Version last_version = _input_rowsets.back()->version();
tablet()->cumulative_compaction_policy()->update_cumulative_point(tablet(),
_input_rowsets,
_output_rowset, last_version);
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index f5cae0fcd4e..884a2bd4de0 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -314,6 +314,12 @@ public:
void set_txn_expiration(int64_t expiration) {
_rowset_meta_pb.set_txn_expiration(expiration); }
+ void set_compaction_level(int64_t compaction_level) {
+ _rowset_meta_pb.set_compaction_level(compaction_level);
+ }
+
+ int64_t compaction_level() { return _rowset_meta_pb.compaction_level(); }
+
// Because the member field '_handle' is a raw pointer, use member func
'init' to replace copy ctor
RowsetMeta(const RowsetMeta&) = delete;
RowsetMeta operator=(const RowsetMeta&) = delete;
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index d5befdb50d1..e23ffca08f4 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -73,7 +73,8 @@ TabletMetaSharedPtr TabletMeta::create(
request.time_series_compaction_goal_size_mbytes,
request.time_series_compaction_file_count_threshold,
request.time_series_compaction_time_threshold_seconds,
- request.time_series_compaction_empty_rowsets_threshold);
+ request.time_series_compaction_empty_rowsets_threshold,
+ request.time_series_compaction_level_threshold);
}
TabletMeta::TabletMeta()
@@ -92,7 +93,8 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t
partition_id, int64_t tablet_id
int64_t time_series_compaction_goal_size_mbytes,
int64_t time_series_compaction_file_count_threshold,
int64_t time_series_compaction_time_threshold_seconds,
- int64_t time_series_compaction_empty_rowsets_threshold)
+ int64_t time_series_compaction_empty_rowsets_threshold,
+ int64_t time_series_compaction_level_threshold)
: _tablet_uid(0, 0),
_schema(new TabletSchema),
_delete_bitmap(new DeleteBitmap(tablet_id)) {
@@ -122,6 +124,8 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t
partition_id, int64_t tablet_id
time_series_compaction_time_threshold_seconds);
tablet_meta_pb.set_time_series_compaction_empty_rowsets_threshold(
time_series_compaction_empty_rowsets_threshold);
+ tablet_meta_pb.set_time_series_compaction_level_threshold(
+ time_series_compaction_level_threshold);
TabletSchemaPB* schema = tablet_meta_pb.mutable_schema();
schema->set_num_short_key_columns(tablet_schema.short_key_column_count);
schema->set_num_rows_per_row_block(config::default_num_rows_per_column_file_block);
@@ -324,7 +328,8 @@ TabletMeta::TabletMeta(const TabletMeta& b)
_time_series_compaction_time_threshold_seconds(
b._time_series_compaction_time_threshold_seconds),
_time_series_compaction_empty_rowsets_threshold(
- b._time_series_compaction_empty_rowsets_threshold) {};
+ b._time_series_compaction_empty_rowsets_threshold),
+
_time_series_compaction_level_threshold(b._time_series_compaction_level_threshold)
{};
void TabletMeta::init_column_from_tcolumn(uint32_t unique_id, const TColumn&
tcolumn,
ColumnPB* column) {
@@ -614,6 +619,8 @@ void TabletMeta::init_from_pb(const TabletMetaPB&
tablet_meta_pb) {
tablet_meta_pb.time_series_compaction_time_threshold_seconds();
_time_series_compaction_empty_rowsets_threshold =
tablet_meta_pb.time_series_compaction_empty_rowsets_threshold();
+ _time_series_compaction_level_threshold =
+ tablet_meta_pb.time_series_compaction_level_threshold();
}
void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
@@ -702,6 +709,8 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
time_series_compaction_time_threshold_seconds());
tablet_meta_pb->set_time_series_compaction_empty_rowsets_threshold(
time_series_compaction_empty_rowsets_threshold());
+ tablet_meta_pb->set_time_series_compaction_level_threshold(
+ time_series_compaction_level_threshold());
}
int64_t TabletMeta::mem_size() const {
@@ -897,6 +906,8 @@ bool operator==(const TabletMeta& a, const TabletMeta& b) {
if (a._time_series_compaction_empty_rowsets_threshold !=
b._time_series_compaction_empty_rowsets_threshold)
return false;
+ if (a._time_series_compaction_level_threshold !=
b._time_series_compaction_level_threshold)
+ return false;
return true;
}
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index d5d1322d341..362d3135219 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -110,7 +110,8 @@ public:
int64_t time_series_compaction_goal_size_mbytes = 1024,
int64_t time_series_compaction_file_count_threshold = 2000,
int64_t time_series_compaction_time_threshold_seconds = 3600,
- int64_t time_series_compaction_empty_rowsets_threshold = 5);
+ int64_t time_series_compaction_empty_rowsets_threshold = 5,
+ int64_t time_series_compaction_level_threshold = 1);
// If need add a filed in TableMeta, filed init copy in copy construct
function
TabletMeta(const TabletMeta& tablet_meta);
TabletMeta(TabletMeta&& tablet_meta) = delete;
@@ -263,6 +264,12 @@ public:
int64_t time_series_compaction_empty_rowsets_threshold() const {
return _time_series_compaction_empty_rowsets_threshold;
}
+ void set_time_series_compaction_level_threshold(int64_t level_threshold) {
+ _time_series_compaction_level_threshold = level_threshold;
+ }
+ int64_t time_series_compaction_level_threshold() const {
+ return _time_series_compaction_level_threshold;
+ }
private:
Status _save_meta(DataDir* data_dir);
@@ -317,6 +324,7 @@ private:
int64_t _time_series_compaction_file_count_threshold = 0;
int64_t _time_series_compaction_time_threshold_seconds = 0;
int64_t _time_series_compaction_empty_rowsets_threshold = 0;
+ int64_t _time_series_compaction_level_threshold = 0;
mutable std::shared_mutex _meta_lock;
};
diff --git
a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
index 44dbf5cceb8..0bf9590ebf1 100644
---
a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
+++
b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
@@ -461,6 +461,15 @@ Set table properties. The following attributes are
currently supported:
`"time_series_compaction_time_threshold_seconds" = "3600"`
+* `time_series_compaction_level_threshold`
+
+ When time series compaction policy is applied, This parameter defaults to
1. When set to 2, it is used to control the re-merging of segments that have
been
+
+ merged once, ensuring that the segment size reaches the
time_series_compaction_goal_size_mbytes, which can achieve the effect of
reducing the number of
+
+ segments.
+
+ `"time_series_compaction_level_threshold" = "2"`
* Dynamic partition related
diff --git
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
index e8ac8e69f0a..657adea881d 100644
---
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
+++
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
@@ -444,6 +444,14 @@ UNIQUE KEY(k1, k2)
`"time_series_compaction_time_threshold_seconds" = "3600"`
+* `time_series_compaction_level_threshold`
+
+ compaction 的合并策略为 time_series
时,此参数默认为1,当设置为2时用来控住对于合并过一次的段再合并一层,保证段大小达到time_series_compaction_goal_size_mbytes,
+
+ 能达到段数量减少的效果。
+
+ `"time_series_compaction_level_threshold" = "2"`
+
* 动态分区相关
动态分区相关参数如下:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
index 8c8d0e105b4..1f27d68f0be 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
@@ -519,7 +519,9 @@ public class Alter {
|| properties
.containsKey(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD)
|| properties
-
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD));
+
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)
+ || properties
+
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD));
((SchemaChangeHandler)
schemaChangeHandler).updateTableProperties(db, tableName, properties);
} else {
throw new DdlException("Invalid alter operation: " +
alterClause.getOpType());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index f0d83866271..d6c8ea694c5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -261,6 +261,7 @@ public class RollupJobV2 extends AlterJobV2 implements
GsonPostProcessable {
tbl.getTimeSeriesCompactionFileCountThreshold(),
tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
+ tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.storeRowColumn(),
binlogConfig);
createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId),
baseSchemaHash);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index eb116b765e8..c295753fd27 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -2237,6 +2237,13 @@ public class SchemaChangeHandler extends AlterHandler {
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)));
}
+ if
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD))
{
+ timeSeriesCompactionConfig
+
.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD,
+ Long.parseLong(properties
+
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD)));
+ }
+
if (isInMemory < 0 && storagePolicyId < 0 && compactionPolicy == null
&& timeSeriesCompactionConfig.isEmpty()
&&
!properties.containsKey(PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED)
&&
!properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 8211dd3ccef..8c80321521b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -273,6 +273,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
tbl.getTimeSeriesCompactionFileCountThreshold(),
tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
+
tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.storeRowColumn(),
binlogConfig);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
index cc4dede8f12..c2bc7bc7d0d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
@@ -222,6 +222,23 @@ public class ModifyTablePropertiesClause extends
AlterTableClause {
}
this.needTableStable = false;
this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC;
+ } else if
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD))
{
+ long levelThreshold;
+ String levelThresholdStr = properties
+
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD);
+ try {
+ levelThreshold = Long.parseLong(levelThresholdStr);
+ if (levelThreshold < 1 || levelThreshold > 2) {
+ throw new AnalysisException(
+ "time_series_compaction_level_threshold can not be
less than 1 or greater than 2:"
+ + levelThresholdStr);
+ }
+ } catch (NumberFormatException e) {
+ throw new AnalysisException("Invalid
time_series_compaction_level_threshold format: "
+ + levelThresholdStr);
+ }
+ this.needTableStable = false;
+ this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC;
} else if
(properties.containsKey(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD)) {
if
(!properties.get(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD).equalsIgnoreCase("true")
&& !properties.get(PropertyAnalyzer
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 75210819b6d..084ef61f19c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -1085,6 +1085,7 @@ public class RestoreJob extends AbstractJob {
localTbl.getTimeSeriesCompactionFileCountThreshold(),
localTbl.getTimeSeriesCompactionTimeThresholdSeconds(),
localTbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
+ localTbl.getTimeSeriesCompactionLevelThreshold(),
localTbl.storeRowColumn(),
binlogConfig);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 28abfefd2e3..c28c605d4ff 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -3513,6 +3513,14 @@ public class Env {
sb.append(olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold()).append("\"");
}
+ // time series compaction level threshold
+ if (olapTable.getCompactionPolicy() != null &&
olapTable.getCompactionPolicy()
+
.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)) {
+ sb.append(",\n\"").append(PropertyAnalyzer
+
.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD).append("\" = \"");
+
sb.append(olapTable.getTimeSeriesCompactionLevelThreshold()).append("\"");
+ }
+
// disable auto compaction
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_DISABLE_AUTO_COMPACTION).append("\"
= \"");
sb.append(olapTable.disableAutoCompaction()).append("\"");
@@ -4942,7 +4950,8 @@ public class Env {
.buildSkipWriteIndexOnLoad()
.buildDisableAutoCompaction()
.buildEnableSingleReplicaCompaction()
- .buildTimeSeriesCompactionEmptyRowsetsThreshold();
+ .buildTimeSeriesCompactionEmptyRowsetsThreshold()
+ .buildTimeSeriesCompactionLevelThreshold();
// need to update partition info meta
for (Partition partition : table.getPartitions()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 449ad13e32e..91df9fa61a7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -2197,6 +2197,20 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf {
return null;
}
+ public void setTimeSeriesCompactionLevelThreshold(long
timeSeriesCompactionLevelThreshold) {
+ TableProperty tableProperty = getOrCreatTableProperty();
+
tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD,
+
Long.valueOf(timeSeriesCompactionLevelThreshold).toString());
+ tableProperty.buildTimeSeriesCompactionLevelThreshold();
+ }
+
+ public Long getTimeSeriesCompactionLevelThreshold() {
+ if (tableProperty != null) {
+ return tableProperty.timeSeriesCompactionLevelThreshold();
+ }
+ return null;
+ }
+
public int getBaseSchemaVersion() {
MaterializedIndexMeta baseIndexMeta = indexIdToMeta.get(baseIndexId);
return baseIndexMeta.getSchemaVersion();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
index 23a844538a4..2c3fa1fbb6d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
@@ -105,6 +105,9 @@ public class TableProperty implements Writable {
private long timeSeriesCompactionEmptyRowsetsThreshold
=
PropertyAnalyzer.TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD_DEFAULT_VALUE;
+ private long timeSeriesCompactionLevelThreshold
+ =
PropertyAnalyzer.TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE;
+
private DataSortInfo dataSortInfo = new DataSortInfo();
public TableProperty(Map<String, String> properties) {
@@ -142,6 +145,7 @@ public class TableProperty implements Writable {
buildEnableSingleReplicaCompaction();
buildDisableAutoCompaction();
buildTimeSeriesCompactionEmptyRowsetsThreshold();
+ buildTimeSeriesCompactionLevelThreshold();
break;
default:
break;
@@ -308,6 +312,17 @@ public class TableProperty implements Writable {
return timeSeriesCompactionEmptyRowsetsThreshold;
}
+ public TableProperty buildTimeSeriesCompactionLevelThreshold() {
+ timeSeriesCompactionLevelThreshold = Long.parseLong(properties
+
.getOrDefault(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD,
+
String.valueOf(PropertyAnalyzer.TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE)));
+ return this;
+ }
+
+ public long timeSeriesCompactionLevelThreshold() {
+ return timeSeriesCompactionLevelThreshold;
+ }
+
public TableProperty buildMinLoadReplicaNum() {
minLoadReplicaNum = Short.parseShort(
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_MIN_LOAD_REPLICA_NUM,
"-1"));
@@ -584,7 +599,8 @@ public class TableProperty implements Writable {
.buildTimeSeriesCompactionTimeThresholdSeconds()
.buildDisableAutoCompaction()
.buildEnableSingleReplicaCompaction()
- .buildTimeSeriesCompactionEmptyRowsetsThreshold();
+ .buildTimeSeriesCompactionEmptyRowsetsThreshold()
+ .buildTimeSeriesCompactionLevelThreshold();
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_105) {
// get replica num from property map and create replica allocation
String repNum =
tableProperty.properties.remove(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index 1adea8d1b1e..7a47d8ca072 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -152,6 +152,9 @@ public class PropertyAnalyzer {
public static final String
PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD =
"time_series_compaction_empty_rowsets_threshold";
+ public static final String
PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD =
+ "time_series_compaction_level_threshold";
+
public static final String PROPERTIES_MUTABLE = "mutable";
public static final String PROPERTIES_IS_BEING_SYNCED = "is_being_synced";
@@ -196,6 +199,7 @@ public class PropertyAnalyzer {
public static final long
TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE = 2000;
public static final long
TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE = 3600;
public static final long
TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD_DEFAULT_VALUE = 5;
+ public static final long
TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE = 1;
public enum RewriteType {
PUT, // always put property
@@ -835,6 +839,30 @@ public class PropertyAnalyzer {
return emptyRowsetsThreshold;
}
+ public static long analyzeTimeSeriesCompactionLevelThreshold(Map<String,
String> properties)
+ throws AnalysisException {
+ long levelThreshold =
TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE;
+ if (properties == null || properties.isEmpty()) {
+ return levelThreshold;
+ }
+ if
(properties.containsKey(PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD)) {
+ String levelThresholdStr = properties
+
.get(PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD);
+
properties.remove(PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD);
+ try {
+ levelThreshold = Long.parseLong(levelThresholdStr);
+ if (levelThreshold < 1 || levelThreshold > 2) {
+ throw new
AnalysisException("time_series_compaction_level_threshold can not"
+ + " less than 1 or greater than 2: " +
levelThreshold);
+ }
+ } catch (NumberFormatException e) {
+ throw new AnalysisException("Invalid
time_series_compaction_level_threshold: "
+ + levelThreshold);
+ }
+ }
+ return levelThreshold;
+ }
+
public static long
analyzeTimeSeriesCompactionFileCountThreshold(Map<String, String> properties)
throws AnalysisException {
long fileCountThreshold =
TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 3b689f44ba3..90d152985ac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -1512,6 +1512,10 @@ public class InternalCatalog implements
CatalogIf<Database> {
properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD,
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold().toString());
}
+ if
(!properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD))
{
+
properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD,
+
olapTable.getTimeSeriesCompactionLevelThreshold().toString());
+ }
if
(!properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY)) {
properties.put(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY,
olapTable.getStoragePolicy());
}
@@ -1944,6 +1948,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
tbl.getTimeSeriesCompactionFileCountThreshold(),
tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
+ tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.storeRowColumn(), binlogConfig);
task.setStorageFormat(tbl.getStorageFormat());
@@ -2225,7 +2230,9 @@ public class InternalCatalog implements
CatalogIf<Database> {
|| properties
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)
|| properties
-
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)))
{
+
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)
+ || properties
+
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD)))
{
throw new DdlException("only time series compaction policy support
for time series config");
}
@@ -2273,6 +2280,17 @@ public class InternalCatalog implements
CatalogIf<Database> {
}
olapTable.setTimeSeriesCompactionEmptyRowsetsThreshold(timeSeriesCompactionEmptyRowsetsThreshold);
+ // set time series compaction level threshold
+ long timeSeriesCompactionLevelThreshold
+ =
PropertyAnalyzer.TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE;
+ try {
+ timeSeriesCompactionLevelThreshold = PropertyAnalyzer
+
.analyzeTimeSeriesCompactionLevelThreshold(properties);
+ } catch (AnalysisException e) {
+ throw new DdlException(e.getMessage());
+ }
+
olapTable.setTimeSeriesCompactionLevelThreshold(timeSeriesCompactionLevelThreshold);
+
// get storage format
TStorageFormat storageFormat = TStorageFormat.V2; // default is
segment v2
try {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 6950dc83e9a..6b24bea1ed5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -849,6 +849,7 @@ public class ReportHandler extends Daemon {
olapTable.getTimeSeriesCompactionFileCountThreshold(),
olapTable.getTimeSeriesCompactionTimeThresholdSeconds(),
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(),
+
olapTable.getTimeSeriesCompactionLevelThreshold(),
olapTable.storeRowColumn(),
binlogConfig);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
index 476de574b65..2f8377b0949 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
@@ -113,6 +113,8 @@ public class CreateReplicaTask extends AgentTask {
private long timeSeriesCompactionEmptyRowsetsThreshold;
+ private long timeSeriesCompactionLevelThreshold;
+
private boolean storeRowColumn;
private BinlogConfig binlogConfig;
@@ -137,6 +139,7 @@ public class CreateReplicaTask extends AgentTask {
long timeSeriesCompactionFileCountThreshold,
long timeSeriesCompactionTimeThresholdSeconds,
long timeSeriesCompactionEmptyRowsetsThreshold,
+ long timeSeriesCompactionLevelThreshold,
boolean storeRowColumn,
BinlogConfig binlogConfig) {
super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId,
indexId, tabletId);
@@ -179,6 +182,7 @@ public class CreateReplicaTask extends AgentTask {
this.timeSeriesCompactionFileCountThreshold =
timeSeriesCompactionFileCountThreshold;
this.timeSeriesCompactionTimeThresholdSeconds =
timeSeriesCompactionTimeThresholdSeconds;
this.timeSeriesCompactionEmptyRowsetsThreshold =
timeSeriesCompactionEmptyRowsetsThreshold;
+ this.timeSeriesCompactionLevelThreshold =
timeSeriesCompactionLevelThreshold;
this.storeRowColumn = storeRowColumn;
this.binlogConfig = binlogConfig;
}
@@ -333,6 +337,7 @@ public class CreateReplicaTask extends AgentTask {
createTabletReq.setTimeSeriesCompactionFileCountThreshold(timeSeriesCompactionFileCountThreshold);
createTabletReq.setTimeSeriesCompactionTimeThresholdSeconds(timeSeriesCompactionTimeThresholdSeconds);
createTabletReq.setTimeSeriesCompactionEmptyRowsetsThreshold(timeSeriesCompactionEmptyRowsetsThreshold);
+
createTabletReq.setTimeSeriesCompactionLevelThreshold(timeSeriesCompactionLevelThreshold);
if (binlogConfig != null) {
createTabletReq.setBinlogConfig(binlogConfig.toThrift());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java
index ad20b7b918d..7d4c6a3d022 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java
@@ -164,6 +164,11 @@ public class UpdateTabletMetaInfoTask extends AgentTask {
metaInfo.setTimeSeriesCompactionEmptyRowsetsThreshold(timeSeriesCompactionConfig
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD));
}
+ if (timeSeriesCompactionConfig
+
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD))
{
+
metaInfo.setTimeSeriesCompactionLevelThreshold(timeSeriesCompactionConfig
+
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD));
+ }
}
if (enableSingleReplicaCompaction >= 0) {
metaInfo.setEnableSingleReplicaCompaction(enableSingleReplicaCompaction > 0);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
index 60ec442bedf..3dc4bc4695c 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
@@ -107,7 +107,7 @@ public class AgentTaskTest {
createReplicaTask = new CreateReplicaTask(backendId1, dbId, tableId,
partitionId,
indexId1, tabletId1, replicaId1, shortKeyNum, schemaHash1,
version, KeysType.AGG_KEYS, storageType,
TStorageMedium.SSD, columns, null, 0, latch, null, false,
TTabletType.TABLET_TYPE_DISK, null,
- TCompressionType.LZ4F, false, "", false, false, false, "", 0,
0, 0, 0, false, null);
+ TCompressionType.LZ4F, false, "", false, false, false, "", 0,
0, 0, 0, 0, false, null);
// drop
dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1,
schemaHash1, false);
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index cf1d94c3aae..6bc2847c309 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -117,6 +117,7 @@ message RowsetMetaPB {
reserved 50;
// to indicate whether the data between the segments overlap
optional SegmentsOverlapPB segments_overlap_pb = 51 [default =
OVERLAP_UNKNOWN];
+ optional int64 compaction_level = 52 [default = 0];
// For cloud
// for data recycling
@@ -457,6 +458,7 @@ message TabletMetaPB {
optional int64 time_series_compaction_file_count_threshold = 30 [default =
2000];
optional int64 time_series_compaction_time_threshold_seconds = 31 [default
= 3600];
optional int64 time_series_compaction_empty_rowsets_threshold = 32
[default = 5];
+ optional int64 time_series_compaction_level_threshold = 33 [default = 1];
// For cloud
optional int64 index_id = 1000;
@@ -510,6 +512,7 @@ message TabletMetaCloudPB {
optional int64 time_series_compaction_file_count_threshold = 33 [default =
2000];
optional int64 time_series_compaction_time_threshold_seconds = 34 [default
= 3600];
optional int64 time_series_compaction_empty_rowsets_threshold = 35
[default = 5];
+ optional int64 time_series_compaction_level_threshold = 36 [default = 1];
// Use for selectdb-cloud
optional string table_name = 101;
diff --git a/gensrc/thrift/AgentService.thrift
b/gensrc/thrift/AgentService.thrift
index b9b50663c3e..83a39ba90f1 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -151,6 +151,7 @@ struct TCreateTabletReq {
24: optional i64 time_series_compaction_file_count_threshold = 2000
25: optional i64 time_series_compaction_time_threshold_seconds = 3600
26: optional i64 time_series_compaction_empty_rowsets_threshold = 5
+ 27: optional i64 time_series_compaction_level_threshold = 1
// For cloud
1000: optional bool is_in_memory = false
@@ -440,6 +441,7 @@ struct TTabletMetaInfo {
15: optional bool skip_write_index_on_load
16: optional bool disable_auto_compaction
17: optional i64 time_series_compaction_empty_rowsets_threshold
+ 18: optional i64 time_series_compaction_level_threshold
}
struct TUpdateTabletMetaInfoReq {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]