This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 2a7db2a4ff1 [opt](invert index) modify of time series compaction
policy #31488 (#32483)
2a7db2a4ff1 is described below
commit 2a7db2a4ff155cdc72930bffc935f3f16eb6e283
Author: zzzxl <[email protected]>
AuthorDate: Thu Mar 21 14:19:57 2024 +0800
[opt](invert index) modify of time series compaction policy #31488 (#32483)
---
be/src/agent/task_worker_pool.cpp | 23 +++--
be/src/olap/cumulative_compaction.cpp | 8 +-
be/src/olap/cumulative_compaction_policy.h | 8 ++
.../cumulative_compaction_time_series_policy.cpp | 108 +++++++++++++++++++--
.../cumulative_compaction_time_series_policy.h | 4 +
be/src/olap/full_compaction.cpp | 6 +-
be/src/olap/rowset/rowset_meta.h | 6 ++
be/src/olap/tablet_meta.cpp | 17 +++-
be/src/olap/tablet_meta.h | 10 +-
.../Create/CREATE-TABLE.md | 9 ++
.../Create/CREATE-TABLE.md | 8 ++
.../main/java/org/apache/doris/alter/Alter.java | 4 +-
.../java/org/apache/doris/alter/RollupJobV2.java | 1 +
.../apache/doris/alter/SchemaChangeHandler.java | 7 ++
.../org/apache/doris/alter/SchemaChangeJobV2.java | 1 +
.../analysis/ModifyTablePropertiesClause.java | 17 ++++
.../java/org/apache/doris/backup/RestoreJob.java | 1 +
.../main/java/org/apache/doris/catalog/Env.java | 11 ++-
.../java/org/apache/doris/catalog/OlapTable.java | 14 +++
.../org/apache/doris/catalog/TableProperty.java | 18 +++-
.../apache/doris/common/util/PropertyAnalyzer.java | 29 +++++-
.../apache/doris/datasource/InternalCatalog.java | 20 +++-
.../org/apache/doris/master/ReportHandler.java | 1 +
.../org/apache/doris/task/CreateReplicaTask.java | 5 +
.../doris/task/UpdateTabletMetaInfoTask.java | 5 +
.../java/org/apache/doris/task/AgentTaskTest.java | 2 +-
gensrc/proto/olap_file.proto | 2 +
gensrc/thrift/AgentService.thrift | 2 +
28 files changed, 320 insertions(+), 27 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 00c16cfa12d..34918a5b0a2 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -52,6 +52,7 @@
#include "io/fs/local_file_system.h"
#include "io/fs/path.h"
#include "io/fs/s3_file_system.h"
+#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/data_dir.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset_meta.h"
@@ -691,8 +692,8 @@ void update_tablet_meta_callback(StorageEngine& engine,
const TAgentTaskRequest&
need_to_save = true;
}
if (tablet_meta_info.__isset.compaction_policy) {
- if (tablet_meta_info.compaction_policy != "size_based" &&
- tablet_meta_info.compaction_policy != "time_series") {
+ if (tablet_meta_info.compaction_policy !=
CUMULATIVE_SIZE_BASED_POLICY &&
+ tablet_meta_info.compaction_policy !=
CUMULATIVE_TIME_SERIES_POLICY) {
status = Status::InvalidArgument(
"invalid compaction policy, only support for
size_based or "
"time_series");
@@ -702,7 +703,7 @@ void update_tablet_meta_callback(StorageEngine& engine,
const TAgentTaskRequest&
need_to_save = true;
}
if (tablet_meta_info.__isset.time_series_compaction_goal_size_mbytes) {
- if (tablet->tablet_meta()->compaction_policy() != "time_series") {
+ if (tablet->tablet_meta()->compaction_policy() !=
CUMULATIVE_TIME_SERIES_POLICY) {
status = Status::InvalidArgument(
"only time series compaction policy support time
series config");
continue;
@@ -712,7 +713,7 @@ void update_tablet_meta_callback(StorageEngine& engine,
const TAgentTaskRequest&
need_to_save = true;
}
if
(tablet_meta_info.__isset.time_series_compaction_file_count_threshold) {
- if (tablet->tablet_meta()->compaction_policy() != "time_series") {
+ if (tablet->tablet_meta()->compaction_policy() !=
CUMULATIVE_TIME_SERIES_POLICY) {
status = Status::InvalidArgument(
"only time series compaction policy support time
series config");
continue;
@@ -722,7 +723,7 @@ void update_tablet_meta_callback(StorageEngine& engine,
const TAgentTaskRequest&
need_to_save = true;
}
if
(tablet_meta_info.__isset.time_series_compaction_time_threshold_seconds) {
- if (tablet->tablet_meta()->compaction_policy() != "time_series") {
+ if (tablet->tablet_meta()->compaction_policy() !=
CUMULATIVE_TIME_SERIES_POLICY) {
status = Status::InvalidArgument(
"only time series compaction policy support time
series config");
continue;
@@ -732,7 +733,7 @@ void update_tablet_meta_callback(StorageEngine& engine,
const TAgentTaskRequest&
need_to_save = true;
}
if
(tablet_meta_info.__isset.time_series_compaction_empty_rowsets_threshold) {
- if (tablet->tablet_meta()->compaction_policy() != "time_series") {
+ if (tablet->tablet_meta()->compaction_policy() !=
CUMULATIVE_TIME_SERIES_POLICY) {
status = Status::InvalidArgument(
"only time series compaction policy support time
series config");
continue;
@@ -741,6 +742,16 @@ void update_tablet_meta_callback(StorageEngine& engine,
const TAgentTaskRequest&
tablet_meta_info.time_series_compaction_empty_rowsets_threshold);
need_to_save = true;
}
+ if (tablet_meta_info.__isset.time_series_compaction_level_threshold) {
+ if (tablet->tablet_meta()->compaction_policy() !=
CUMULATIVE_TIME_SERIES_POLICY) {
+ status = Status::InvalidArgument(
+ "only time series compaction policy support time
series config");
+ continue;
+ }
+ tablet->tablet_meta()->set_time_series_compaction_level_threshold(
+ tablet_meta_info.time_series_compaction_level_threshold);
+ need_to_save = true;
+ }
if (tablet_meta_info.__isset.replica_id) {
tablet->tablet_meta()->set_replica_id(tablet_meta_info.replica_id);
}
diff --git a/be/src/olap/cumulative_compaction.cpp
b/be/src/olap/cumulative_compaction.cpp
index 1f54c1f3285..42748012cab 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -81,13 +81,17 @@ Status CumulativeCompaction::execute_compact_impl() {
// 4. set state to success
_state = CompactionState::SUCCESS;
- // 5. set cumulative point
+ // 5. set cumulative level
+
_tablet->cumulative_compaction_policy()->update_compaction_level(_tablet.get(),
_input_rowsets,
+
_output_rowset);
+
+ // 6. set cumulative point
_tablet->cumulative_compaction_policy()->update_cumulative_point(
_tablet.get(), _input_rowsets, _output_rowset,
_last_delete_version);
VLOG_CRITICAL << "after cumulative compaction, current cumulative point is
"
<< _tablet->cumulative_layer_point() << ", tablet=" <<
_tablet->tablet_id();
- // 6. add metric to cumulative compaction
+ // 7. add metric to cumulative compaction
DorisMetrics::instance()->cumulative_compaction_deltas_total->increment(_input_rowsets.size());
DorisMetrics::instance()->cumulative_compaction_bytes_total->increment(_input_rowsets_size);
diff --git a/be/src/olap/cumulative_compaction_policy.h
b/be/src/olap/cumulative_compaction_policy.h
index b9b3bb46c0f..e2e5ca5460d 100644
--- a/be/src/olap/cumulative_compaction_policy.h
+++ b/be/src/olap/cumulative_compaction_policy.h
@@ -96,6 +96,11 @@ public:
int64_t current_cumulative_point,
int64_t* cumulative_point) = 0;
+ // Updates the compaction level of a tablet after a compaction operation.
+ virtual void update_compaction_level(Tablet* tablet,
+ const std::vector<RowsetSharedPtr>&
input_rowsets,
+ RowsetSharedPtr output_rowset) = 0;
+
/// Fetch cumulative policy name
virtual std::string_view name() = 0;
};
@@ -149,6 +154,9 @@ public:
/// Its main policy is calculating the accumulative compaction score after
current cumulative_point in tablet.
uint32_t calc_cumulative_compaction_score(Tablet* tablet) override;
+ void update_compaction_level(Tablet* tablet, const
std::vector<RowsetSharedPtr>& input_rowsets,
+ RowsetSharedPtr output_rowset) override {}
+
std::string_view name() override { return CUMULATIVE_SIZE_BASED_POLICY; }
private:
diff --git a/be/src/olap/cumulative_compaction_time_series_policy.cpp
b/be/src/olap/cumulative_compaction_time_series_policy.cpp
index 6f7cb8e2e35..6c3f949723a 100644
--- a/be/src/olap/cumulative_compaction_time_series_policy.cpp
+++ b/be/src/olap/cumulative_compaction_time_series_policy.cpp
@@ -17,19 +17,24 @@
#include "olap/cumulative_compaction_time_series_policy.h"
+#include <algorithm>
+
#include "common/logging.h"
#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
#include "util/time.h"
namespace doris {
uint32_t
TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet*
tablet) {
uint32_t score = 0;
+ uint32_t level0_score = 0;
bool base_rowset_exist = false;
const int64_t point = tablet->cumulative_layer_point();
- int64_t total_size = 0;
+ int64_t level0_total_size = 0;
RowsetMetaSharedPtr first_meta;
int64_t first_version = INT64_MAX;
+ std::list<RowsetMetaSharedPtr> checked_rs_metas;
// NOTE: tablet._meta_lock is hold
auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
// check the base rowset and collect the rowsets of cumulative part
@@ -47,8 +52,13 @@ uint32_t
TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
continue;
} else {
// collect the rowsets of cumulative part
- total_size += rs_meta->total_disk_size();
score += rs_meta->get_compaction_score();
+ if (rs_meta->compaction_level() == 0) {
+ level0_total_size += rs_meta->total_disk_size();
+ level0_score += rs_meta->get_compaction_score();
+ } else {
+ checked_rs_metas.push_back(rs_meta);
+ }
}
}
@@ -73,21 +83,39 @@ uint32_t
TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
// Condition 1: the size of input files for compaction meets the
requirement of parameter compaction_goal_size
int64_t compaction_goal_size_mbytes =
tablet->tablet_meta()->time_series_compaction_goal_size_mbytes();
- if (total_size >= compaction_goal_size_mbytes * 1024 * 1024) {
+ if (level0_total_size >= compaction_goal_size_mbytes * 1024 * 1024) {
return score;
}
// Condition 2: the number of input files reaches the threshold specified
by parameter compaction_file_count_threshold
- if (score >=
tablet->tablet_meta()->time_series_compaction_file_count_threshold()) {
+ if (level0_score >=
tablet->tablet_meta()->time_series_compaction_file_count_threshold()) {
return score;
}
+ // Condition 3: level1 achieve compaction_goal_size
+ if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
+ checked_rs_metas.sort([](const RowsetMetaSharedPtr& a, const
RowsetMetaSharedPtr& b) {
+ return a->version().first < b->version().first;
+ });
+ int32_t rs_meta_count = 0;
+ int64_t continuous_size = 0;
+ for (const auto& rs_meta : checked_rs_metas) {
+ rs_meta_count++;
+ continuous_size += rs_meta->total_disk_size();
+ if (rs_meta_count >= 2) {
+ if (continuous_size >= compaction_goal_size_mbytes * 1024 *
1024) {
+ return score;
+ }
+ }
+ }
+ }
+
int64_t now = UnixMillis();
int64_t last_cumu = tablet->last_cumu_compaction_success_time();
if (last_cumu != 0) {
int64_t cumu_interval = now - last_cumu;
- // Condition 3: the time interval between compactions exceeds the
value specified by parameter _compaction_time_threshold_second
+ // Condition 4: the time interval between compactions exceeds the
value specified by parameter _compaction_time_threshold_second
if (cumu_interval >
(tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() *
1000)) {
return score;
@@ -163,6 +191,13 @@ void
TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(
break;
}
+ // upgrade: [0 0 2 1 1 0 0]
+ if (!is_delete &&
tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2 &&
+ rs->compaction_level() == 1) {
+ *ret_cumulative_point = rs->version().first;
+ break;
+ }
+
// include one situation: When the segment is not deleted, and is
singleton delta, and is NONOVERLAPPING, ret_cumulative_point increase
prev_version = rs->version().second;
*ret_cumulative_point = prev_version + 1;
@@ -193,6 +228,9 @@ int
TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
return 0;
}
+ int64_t compaction_goal_size_mbytes =
+ tablet->tablet_meta()->time_series_compaction_goal_size_mbytes();
+
int transient_size = 0;
*compaction_score = 0;
input_rowsets->clear();
@@ -231,8 +269,7 @@ int
TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
input_rowsets->push_back(rowset);
// Condition 1: the size of input files for compaction meets the
requirement of parameter compaction_goal_size
- if (total_size >=
- (tablet->tablet_meta()->time_series_compaction_goal_size_mbytes()
* 1024 * 1024)) {
+ if (total_size >= (compaction_goal_size_mbytes * 1024 * 1024)) {
if (input_rowsets->size() == 1 &&
!input_rowsets->front()->rowset_meta()->is_segments_overlapping()) {
// Only 1 non-overlapping rowset, skip it
@@ -262,20 +299,47 @@ int
TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
return transient_size;
}
+ // Condition 3: level1 achieve compaction_goal_size
+ std::vector<RowsetSharedPtr> level1_rowsets;
+ if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
+ int64_t continuous_size = 0;
+ for (const auto& rowset : candidate_rowsets) {
+ const auto& rs_meta = rowset->rowset_meta();
+ if (rs_meta->compaction_level() == 0) {
+ break;
+ }
+ level1_rowsets.push_back(rowset);
+ continuous_size += rs_meta->total_disk_size();
+ if (level1_rowsets.size() >= 2) {
+ if (continuous_size >= compaction_goal_size_mbytes * 1024 *
1024) {
+ input_rowsets->swap(level1_rowsets);
+ return input_rowsets->size();
+ }
+ }
+ }
+ }
+
int64_t now = UnixMillis();
int64_t last_cumu = tablet->last_cumu_compaction_success_time();
if (last_cumu != 0) {
int64_t cumu_interval = now - last_cumu;
- // Condition 3: the time interval between compactions exceeds the
value specified by parameter compaction_time_threshold_second
+ // Condition 4: the time interval between compactions exceeds the
value specified by parameter compaction_time_threshold_second
if (cumu_interval >
(tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() *
1000)) {
+ if
(tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
+ if (input_rowsets->empty() && level1_rowsets.size() >= 2) {
+ input_rowsets->swap(level1_rowsets);
+ return input_rowsets->size();
+ }
+ }
return transient_size;
}
}
input_rowsets->clear();
*compaction_score = 0;
+
return 0;
}
@@ -288,7 +352,35 @@ void
TimeSeriesCumulativeCompactionPolicy::update_cumulative_point(
return;
}
+ if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2 &&
+ output_rowset->rowset_meta()->compaction_level() < 2) {
+ return;
+ }
+
tablet->set_cumulative_layer_point(output_rowset->end_version() + 1);
}
+void TimeSeriesCumulativeCompactionPolicy::update_compaction_level(
+ Tablet* tablet, const std::vector<RowsetSharedPtr>& input_rowsets,
+ RowsetSharedPtr output_rowset) {
+ if (tablet->tablet_state() != TABLET_RUNNING ||
output_rowset->num_segments() == 0) {
+ return;
+ }
+
+ int64_t first_level = 0;
+ for (size_t i = 0; i < input_rowsets.size(); i++) {
+ int64_t cur_level =
input_rowsets[i]->rowset_meta()->compaction_level();
+ if (i == 0) {
+ first_level = cur_level;
+ } else {
+ if (first_level != cur_level) {
+ LOG(ERROR) << "Failed to check compaction level, first_level:
" << first_level
+ << ", cur_level: " << cur_level;
+ }
+ }
+ }
+
+ output_rowset->rowset_meta()->set_compaction_level(first_level + 1);
+}
+
} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/cumulative_compaction_time_series_policy.h
b/be/src/olap/cumulative_compaction_time_series_policy.h
index 015dce055e9..4c134202e1d 100644
--- a/be/src/olap/cumulative_compaction_time_series_policy.h
+++ b/be/src/olap/cumulative_compaction_time_series_policy.h
@@ -59,6 +59,10 @@ public:
void update_cumulative_point(Tablet* tablet, const
std::vector<RowsetSharedPtr>& input_rowsets,
RowsetSharedPtr _output_rowset,
Version& last_delete_version) override;
+
+ void update_compaction_level(Tablet* tablet, const
std::vector<RowsetSharedPtr>& input_rowsets,
+ RowsetSharedPtr output_rowset) override;
+
std::string_view name() override { return CUMULATIVE_TIME_SERIES_POLICY; }
};
diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp
index 927b4a33198..ba336c7cbe9 100644
--- a/be/src/olap/full_compaction.cpp
+++ b/be/src/olap/full_compaction.cpp
@@ -73,7 +73,11 @@ Status FullCompaction::execute_compact_impl() {
// 3. set state to success
_state = CompactionState::SUCCESS;
- // 4. set cumulative point
+ // 4. set cumulative level
+
_tablet->cumulative_compaction_policy()->update_compaction_level(_tablet.get(),
_input_rowsets,
+
_output_rowset);
+
+ // 5. set cumulative point
Version last_version = _input_rowsets.back()->version();
_tablet->cumulative_compaction_policy()->update_cumulative_point(_tablet.get(),
_input_rowsets,
_output_rowset, last_version);
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index 7e1dfaa57c3..30457d30bc6 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -304,6 +304,12 @@ public:
const TabletSchemaSPtr& tablet_schema() { return _schema; }
+ void set_compaction_level(int64_t compaction_level) {
+ _rowset_meta_pb.set_compaction_level(compaction_level);
+ }
+
+ int64_t compaction_level() { return _rowset_meta_pb.compaction_level(); }
+
// Because the member field '_handle' is a raw pointer, use member func
'init' to replace copy ctor
RowsetMeta(const RowsetMeta&) = delete;
RowsetMeta operator=(const RowsetMeta&) = delete;
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 07cc07bc873..23f4428f747 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -71,7 +71,8 @@ TabletMetaSharedPtr TabletMeta::create(
request.time_series_compaction_file_count_threshold,
request.time_series_compaction_time_threshold_seconds,
request.time_series_compaction_empty_rowsets_threshold,
- request.inverted_index_storage_format);
+ request.inverted_index_storage_format,
+ request.time_series_compaction_level_threshold);
}
TabletMeta::TabletMeta()
@@ -91,7 +92,8 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t
partition_id, int64_t tablet_id
int64_t time_series_compaction_file_count_threshold,
int64_t time_series_compaction_time_threshold_seconds,
int64_t time_series_compaction_empty_rowsets_threshold,
- TInvertedIndexStorageFormat::type
inverted_index_storage_format)
+ TInvertedIndexStorageFormat::type
inverted_index_storage_format,
+ int64_t time_series_compaction_level_threshold)
: _tablet_uid(0, 0),
_schema(new TabletSchema),
_delete_bitmap(new DeleteBitmap(tablet_id)) {
@@ -121,6 +123,8 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t
partition_id, int64_t tablet_id
time_series_compaction_time_threshold_seconds);
tablet_meta_pb.set_time_series_compaction_empty_rowsets_threshold(
time_series_compaction_empty_rowsets_threshold);
+ tablet_meta_pb.set_time_series_compaction_level_threshold(
+ time_series_compaction_level_threshold);
TabletSchemaPB* schema = tablet_meta_pb.mutable_schema();
schema->set_num_short_key_columns(tablet_schema.short_key_column_count);
schema->set_num_rows_per_row_block(config::default_num_rows_per_column_file_block);
@@ -334,7 +338,8 @@ TabletMeta::TabletMeta(const TabletMeta& b)
_time_series_compaction_time_threshold_seconds(
b._time_series_compaction_time_threshold_seconds),
_time_series_compaction_empty_rowsets_threshold(
- b._time_series_compaction_empty_rowsets_threshold) {};
+ b._time_series_compaction_empty_rowsets_threshold),
+
_time_series_compaction_level_threshold(b._time_series_compaction_level_threshold)
{};
void TabletMeta::init_column_from_tcolumn(uint32_t unique_id, const TColumn&
tcolumn,
ColumnPB* column) {
@@ -635,6 +640,8 @@ void TabletMeta::init_from_pb(const TabletMetaPB&
tablet_meta_pb) {
tablet_meta_pb.time_series_compaction_time_threshold_seconds();
_time_series_compaction_empty_rowsets_threshold =
tablet_meta_pb.time_series_compaction_empty_rowsets_threshold();
+ _time_series_compaction_level_threshold =
+ tablet_meta_pb.time_series_compaction_level_threshold();
}
void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
@@ -718,6 +725,8 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
time_series_compaction_time_threshold_seconds());
tablet_meta_pb->set_time_series_compaction_empty_rowsets_threshold(
time_series_compaction_empty_rowsets_threshold());
+ tablet_meta_pb->set_time_series_compaction_level_threshold(
+ time_series_compaction_level_threshold());
}
int64_t TabletMeta::mem_size() const {
@@ -906,6 +915,8 @@ bool operator==(const TabletMeta& a, const TabletMeta& b) {
if (a._time_series_compaction_empty_rowsets_threshold !=
b._time_series_compaction_empty_rowsets_threshold)
return false;
+ if (a._time_series_compaction_level_threshold !=
b._time_series_compaction_level_threshold)
+ return false;
return true;
}
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 4400be42af9..d21354366a9 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -112,7 +112,8 @@ public:
int64_t time_series_compaction_time_threshold_seconds = 3600,
int64_t time_series_compaction_empty_rowsets_threshold = 5,
TInvertedIndexStorageFormat::type inverted_index_storage_format
=
- TInvertedIndexStorageFormat::V1);
+ TInvertedIndexStorageFormat::V1,
+ int64_t time_series_compaction_level_threshold = 1);
// If need add a filed in TableMeta, filed init copy in copy construct
function
TabletMeta(const TabletMeta& tablet_meta);
TabletMeta(TabletMeta&& tablet_meta) = delete;
@@ -260,6 +261,12 @@ public:
int64_t time_series_compaction_empty_rowsets_threshold() const {
return _time_series_compaction_empty_rowsets_threshold;
}
+ void set_time_series_compaction_level_threshold(int64_t level_threshold) {
+ _time_series_compaction_level_threshold = level_threshold;
+ }
+ int64_t time_series_compaction_level_threshold() const {
+ return _time_series_compaction_level_threshold;
+ }
private:
Status _save_meta(DataDir* data_dir);
@@ -313,6 +320,7 @@ private:
int64_t _time_series_compaction_file_count_threshold = 0;
int64_t _time_series_compaction_time_threshold_seconds = 0;
int64_t _time_series_compaction_empty_rowsets_threshold = 0;
+ int64_t _time_series_compaction_level_threshold = 0;
mutable std::shared_mutex _meta_lock;
};
diff --git
a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
index 44dbf5cceb8..0bf9590ebf1 100644
---
a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
+++
b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
@@ -461,6 +461,15 @@ Set table properties. The following attributes are
currently supported:
`"time_series_compaction_time_threshold_seconds" = "3600"`
+* `time_series_compaction_level_threshold`
+
+ When time series compaction policy is applied, This parameter defaults to
1. When set to 2, it is used to control the re-merging of segments that have
been
+
+ merged once, ensuring that the segment size reaches the
time_series_compaction_goal_size_mbytes, which can achieve the effect of
reducing the number of
+
+ segments.
+
+ `"time_series_compaction_level_threshold" = "2"`
* Dynamic partition related
diff --git
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
index e8ac8e69f0a..657adea881d 100644
---
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
+++
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
@@ -444,6 +444,14 @@ UNIQUE KEY(k1, k2)
`"time_series_compaction_time_threshold_seconds" = "3600"`
+* `time_series_compaction_level_threshold`
+
+ compaction 的合并策略为 time_series
时,此参数默认为1,当设置为2时用来控住对于合并过一次的段再合并一层,保证段大小达到time_series_compaction_goal_size_mbytes,
+
+ 能达到段数量减少的效果。
+
+ `"time_series_compaction_level_threshold" = "2"`
+
* 动态分区相关
动态分区相关参数如下:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
index 63759885294..1d408b6799f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
@@ -509,7 +509,9 @@ public class Alter {
|| properties
.containsKey(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD)
|| properties
-
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD));
+
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)
+ || properties
+
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD));
((SchemaChangeHandler)
schemaChangeHandler).updateTableProperties(db, tableName, properties);
} else {
throw new DdlException("Invalid alter operation: " +
alterClause.getOpType());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index f60a47632ab..61e4496a358 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -276,6 +276,7 @@ public class RollupJobV2 extends AlterJobV2 implements
GsonPostProcessable {
tbl.getTimeSeriesCompactionFileCountThreshold(),
tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
+ tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.storeRowColumn(),
binlogConfig);
createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId),
baseSchemaHash);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index 55c939a4669..b4d032dc78c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -2234,6 +2234,13 @@ public class SchemaChangeHandler extends AlterHandler {
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)));
}
+ if
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD))
{
+ timeSeriesCompactionConfig
+
.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD,
+ Long.parseLong(properties
+
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD)));
+ }
+
if (isInMemory < 0 && storagePolicyId < 0 && compactionPolicy == null
&& timeSeriesCompactionConfig.isEmpty()
&&
!properties.containsKey(PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED)
&&
!properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 4a8588a0ff2..011cc53979f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -281,6 +281,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
tbl.getTimeSeriesCompactionFileCountThreshold(),
tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
+
tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.storeRowColumn(),
binlogConfig);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
index cc4dede8f12..c2bc7bc7d0d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
@@ -222,6 +222,23 @@ public class ModifyTablePropertiesClause extends
AlterTableClause {
}
this.needTableStable = false;
this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC;
+ } else if
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD))
{
+ long levelThreshold;
+ String levelThresholdStr = properties
+
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD);
+ try {
+ levelThreshold = Long.parseLong(levelThresholdStr);
+ if (levelThreshold < 1 || levelThreshold > 2) {
+ throw new AnalysisException(
+ "time_series_compaction_level_threshold can not be
less than 1 or greater than 2:"
+ + levelThresholdStr);
+ }
+ } catch (NumberFormatException e) {
+ throw new AnalysisException("Invalid
time_series_compaction_level_threshold format: "
+ + levelThresholdStr);
+ }
+ this.needTableStable = false;
+ this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC;
} else if
(properties.containsKey(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD)) {
if
(!properties.get(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD).equalsIgnoreCase("true")
&& !properties.get(PropertyAnalyzer
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index d4470a1029e..2e6bbb162ac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -1084,6 +1084,7 @@ public class RestoreJob extends AbstractJob {
localTbl.getTimeSeriesCompactionFileCountThreshold(),
localTbl.getTimeSeriesCompactionTimeThresholdSeconds(),
localTbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
+ localTbl.getTimeSeriesCompactionLevelThreshold(),
localTbl.storeRowColumn(),
binlogConfig);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 5bf2f3eb1b5..632597bf6df 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -3445,6 +3445,14 @@ public class Env {
sb.append(olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold()).append("\"");
}
+ // time series compaction level threshold
+ if (olapTable.getCompactionPolicy() != null &&
olapTable.getCompactionPolicy()
+
.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)) {
+ sb.append(",\n\"").append(PropertyAnalyzer
+
.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD).append("\" = \"");
+
sb.append(olapTable.getTimeSeriesCompactionLevelThreshold()).append("\"");
+ }
+
// disable auto compaction
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_DISABLE_AUTO_COMPACTION).append("\"
= \"");
sb.append(olapTable.disableAutoCompaction()).append("\"");
@@ -4874,7 +4882,8 @@ public class Env {
.buildSkipWriteIndexOnLoad()
.buildDisableAutoCompaction()
.buildEnableSingleReplicaCompaction()
- .buildTimeSeriesCompactionEmptyRowsetsThreshold();
+ .buildTimeSeriesCompactionEmptyRowsetsThreshold()
+ .buildTimeSeriesCompactionLevelThreshold();
// need to update partition info meta
for (Partition partition : table.getPartitions()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 0badc90fd3f..78e463a8a23 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -2162,6 +2162,20 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf {
return null;
}
+ public void setTimeSeriesCompactionLevelThreshold(long
timeSeriesCompactionLevelThreshold) {
+ TableProperty tableProperty = getOrCreatTableProperty();
+
tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD,
+
Long.valueOf(timeSeriesCompactionLevelThreshold).toString());
+ tableProperty.buildTimeSeriesCompactionLevelThreshold();
+ }
+
+ public Long getTimeSeriesCompactionLevelThreshold() {
+ if (tableProperty != null) {
+ return tableProperty.timeSeriesCompactionLevelThreshold();
+ }
+ return null;
+ }
+
public int getBaseSchemaVersion() {
MaterializedIndexMeta baseIndexMeta = indexIdToMeta.get(baseIndexId);
return baseIndexMeta.getSchemaVersion();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
index 0846579fa18..467e53861dd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
@@ -106,6 +106,9 @@ public class TableProperty implements Writable {
private long timeSeriesCompactionEmptyRowsetsThreshold
=
PropertyAnalyzer.TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD_DEFAULT_VALUE;
+ private long timeSeriesCompactionLevelThreshold
+ =
PropertyAnalyzer.TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE;
+
private DataSortInfo dataSortInfo = new DataSortInfo();
public TableProperty(Map<String, String> properties) {
@@ -143,6 +146,7 @@ public class TableProperty implements Writable {
buildEnableSingleReplicaCompaction();
buildDisableAutoCompaction();
buildTimeSeriesCompactionEmptyRowsetsThreshold();
+ buildTimeSeriesCompactionLevelThreshold();
break;
default:
break;
@@ -300,6 +304,17 @@ public class TableProperty implements Writable {
return timeSeriesCompactionEmptyRowsetsThreshold;
}
+ public TableProperty buildTimeSeriesCompactionLevelThreshold() {
+ timeSeriesCompactionLevelThreshold = Long.parseLong(properties
+
.getOrDefault(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD,
+
String.valueOf(PropertyAnalyzer.TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE)));
+ return this;
+ }
+
+ public long timeSeriesCompactionLevelThreshold() {
+ return timeSeriesCompactionLevelThreshold;
+ }
+
public TableProperty buildMinLoadReplicaNum() {
minLoadReplicaNum = Short.parseShort(
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_MIN_LOAD_REPLICA_NUM,
"-1"));
@@ -588,7 +603,8 @@ public class TableProperty implements Writable {
.buildTimeSeriesCompactionTimeThresholdSeconds()
.buildDisableAutoCompaction()
.buildEnableSingleReplicaCompaction()
- .buildTimeSeriesCompactionEmptyRowsetsThreshold();
+ .buildTimeSeriesCompactionEmptyRowsetsThreshold()
+ .buildTimeSeriesCompactionLevelThreshold();
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_105) {
// get replica num from property map and create replica allocation
String repNum =
tableProperty.properties.remove(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index 9bacb50db32..9737d95e2e6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -151,6 +151,9 @@ public class PropertyAnalyzer {
public static final String
PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD =
"time_series_compaction_empty_rowsets_threshold";
+ public static final String
PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD =
+ "time_series_compaction_level_threshold";
+
public static final String PROPERTIES_MUTABLE = "mutable";
public static final String PROPERTIES_IS_BEING_SYNCED = "is_being_synced";
@@ -198,7 +201,7 @@ public class PropertyAnalyzer {
public static final long
TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE = 2000;
public static final long
TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE = 3600;
public static final long
TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD_DEFAULT_VALUE = 5;
-
+ public static final long
TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE = 1;
/**
* check and replace members of DataProperty by properties.
@@ -751,6 +754,30 @@ public class PropertyAnalyzer {
return emptyRowsetsThreshold;
}
+ public static long analyzeTimeSeriesCompactionLevelThreshold(Map<String,
String> properties)
+ throws AnalysisException {
+ long levelThreshold =
TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE;
+ if (properties == null || properties.isEmpty()) {
+ return levelThreshold;
+ }
+ if
(properties.containsKey(PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD)) {
+ String levelThresholdStr = properties
+
.get(PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD);
+
properties.remove(PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD);
+ try {
+ levelThreshold = Long.parseLong(levelThresholdStr);
+ if (levelThreshold < 1 || levelThreshold > 2) {
+ throw new
AnalysisException("time_series_compaction_level_threshold can not"
+ + " less than 1 or greater than 2: " +
levelThreshold);
+ }
+ } catch (NumberFormatException e) {
+ throw new AnalysisException("Invalid
time_series_compaction_level_threshold: "
+ + levelThreshold);
+ }
+ }
+ return levelThreshold;
+ }
+
public static long
analyzeTimeSeriesCompactionFileCountThreshold(Map<String, String> properties)
throws AnalysisException {
long fileCountThreshold =
TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index b6b89c0ad7d..4e306a246c7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -1482,6 +1482,10 @@ public class InternalCatalog implements
CatalogIf<Database> {
properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD,
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold().toString());
}
+ if
(!properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD))
{
+
properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD,
+
olapTable.getTimeSeriesCompactionLevelThreshold().toString());
+ }
if
(!properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY)) {
properties.put(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY,
olapTable.getStoragePolicy());
}
@@ -1909,6 +1913,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
tbl.getTimeSeriesCompactionFileCountThreshold(),
tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
+ tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.storeRowColumn(), binlogConfig);
task.setStorageFormat(tbl.getStorageFormat());
@@ -2176,7 +2181,9 @@ public class InternalCatalog implements
CatalogIf<Database> {
|| properties
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)
|| properties
-
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)))
{
+
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)
+ || properties
+
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD)))
{
throw new DdlException("only time series compaction policy support
for time series config");
}
@@ -2224,6 +2231,17 @@ public class InternalCatalog implements
CatalogIf<Database> {
}
olapTable.setTimeSeriesCompactionEmptyRowsetsThreshold(timeSeriesCompactionEmptyRowsetsThreshold);
+ // set time series compaction level threshold
+ long timeSeriesCompactionLevelThreshold
+ =
PropertyAnalyzer.TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE;
+ try {
+ timeSeriesCompactionLevelThreshold = PropertyAnalyzer
+
.analyzeTimeSeriesCompactionLevelThreshold(properties);
+ } catch (AnalysisException e) {
+ throw new DdlException(e.getMessage());
+ }
+
olapTable.setTimeSeriesCompactionLevelThreshold(timeSeriesCompactionLevelThreshold);
+
// get storage format
TStorageFormat storageFormat = TStorageFormat.V2; // default is
segment v2
try {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 6950dc83e9a..6b24bea1ed5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -849,6 +849,7 @@ public class ReportHandler extends Daemon {
olapTable.getTimeSeriesCompactionFileCountThreshold(),
olapTable.getTimeSeriesCompactionTimeThresholdSeconds(),
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(),
+
olapTable.getTimeSeriesCompactionLevelThreshold(),
olapTable.storeRowColumn(),
binlogConfig);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
index 262ac8e84fe..7a5262ba0ed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
@@ -116,6 +116,8 @@ public class CreateReplicaTask extends AgentTask {
private long timeSeriesCompactionEmptyRowsetsThreshold;
+ private long timeSeriesCompactionLevelThreshold;
+
private boolean storeRowColumn;
private BinlogConfig binlogConfig;
@@ -140,6 +142,7 @@ public class CreateReplicaTask extends AgentTask {
long timeSeriesCompactionFileCountThreshold,
long timeSeriesCompactionTimeThresholdSeconds,
long timeSeriesCompactionEmptyRowsetsThreshold,
+ long timeSeriesCompactionLevelThreshold,
boolean storeRowColumn,
BinlogConfig binlogConfig) {
super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId,
indexId, tabletId);
@@ -182,6 +185,7 @@ public class CreateReplicaTask extends AgentTask {
this.timeSeriesCompactionFileCountThreshold =
timeSeriesCompactionFileCountThreshold;
this.timeSeriesCompactionTimeThresholdSeconds =
timeSeriesCompactionTimeThresholdSeconds;
this.timeSeriesCompactionEmptyRowsetsThreshold =
timeSeriesCompactionEmptyRowsetsThreshold;
+ this.timeSeriesCompactionLevelThreshold =
timeSeriesCompactionLevelThreshold;
this.storeRowColumn = storeRowColumn;
this.binlogConfig = binlogConfig;
}
@@ -343,6 +347,7 @@ public class CreateReplicaTask extends AgentTask {
createTabletReq.setTimeSeriesCompactionFileCountThreshold(timeSeriesCompactionFileCountThreshold);
createTabletReq.setTimeSeriesCompactionTimeThresholdSeconds(timeSeriesCompactionTimeThresholdSeconds);
createTabletReq.setTimeSeriesCompactionEmptyRowsetsThreshold(timeSeriesCompactionEmptyRowsetsThreshold);
+
createTabletReq.setTimeSeriesCompactionLevelThreshold(timeSeriesCompactionLevelThreshold);
if (binlogConfig != null) {
createTabletReq.setBinlogConfig(binlogConfig.toThrift());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java
index ad20b7b918d..7d4c6a3d022 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java
@@ -164,6 +164,11 @@ public class UpdateTabletMetaInfoTask extends AgentTask {
metaInfo.setTimeSeriesCompactionEmptyRowsetsThreshold(timeSeriesCompactionConfig
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD));
}
+ if (timeSeriesCompactionConfig
+
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD))
{
+
metaInfo.setTimeSeriesCompactionLevelThreshold(timeSeriesCompactionConfig
+
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD));
+ }
}
if (enableSingleReplicaCompaction >= 0) {
metaInfo.setEnableSingleReplicaCompaction(enableSingleReplicaCompaction > 0);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
index 60ec442bedf..3dc4bc4695c 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
@@ -107,7 +107,7 @@ public class AgentTaskTest {
createReplicaTask = new CreateReplicaTask(backendId1, dbId, tableId,
partitionId,
indexId1, tabletId1, replicaId1, shortKeyNum, schemaHash1,
version, KeysType.AGG_KEYS, storageType,
TStorageMedium.SSD, columns, null, 0, latch, null, false,
TTabletType.TABLET_TYPE_DISK, null,
- TCompressionType.LZ4F, false, "", false, false, false, "", 0,
0, 0, 0, false, null);
+ TCompressionType.LZ4F, false, "", false, false, false, "", 0,
0, 0, 0, 0, false, null);
// drop
dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1,
schemaHash1, false);
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index 5eedd8af079..bea90d86006 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -113,6 +113,7 @@ message RowsetMetaPB {
reserved 50;
// to indicate whether the data between the segments overlap
optional SegmentsOverlapPB segments_overlap_pb = 51 [default =
OVERLAP_UNKNOWN];
+ optional int64 compaction_level = 52 [default = 0];
}
message SegmentStatisticsPB {
@@ -345,6 +346,7 @@ message TabletMetaPB {
optional int64 time_series_compaction_file_count_threshold = 30 [default =
2000];
optional int64 time_series_compaction_time_threshold_seconds = 31 [default
= 3600];
optional int64 time_series_compaction_empty_rowsets_threshold = 32
[default = 5];
+ optional int64 time_series_compaction_level_threshold = 33 [default = 1];
}
message OLAPRawDeltaHeaderMessage {
diff --git a/gensrc/thrift/AgentService.thrift
b/gensrc/thrift/AgentService.thrift
index 99196c2e024..645d38eed72 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -160,6 +160,7 @@ struct TCreateTabletReq {
25: optional i64 time_series_compaction_time_threshold_seconds = 3600
26: optional i64 time_series_compaction_empty_rowsets_threshold = 5
27: optional TInvertedIndexStorageFormat inverted_index_storage_format =
TInvertedIndexStorageFormat.V1
+ 28: optional i64 time_series_compaction_level_threshold = 1
}
struct TDropTabletReq {
@@ -430,6 +431,7 @@ struct TTabletMetaInfo {
15: optional bool skip_write_index_on_load
16: optional bool disable_auto_compaction
17: optional i64 time_series_compaction_empty_rowsets_threshold
+ 18: optional i64 time_series_compaction_level_threshold
}
struct TUpdateTabletMetaInfoReq {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]