This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 95b05928fd7 [fix](compaction) fix time series compaction merge empty
rowsets priority #34562 (#34765)
95b05928fd7 is described below
commit 95b05928fd7f920255f9ca34c4fff1a5159d7b94
Author: Sun Chenyang <[email protected]>
AuthorDate: Tue May 14 09:10:09 2024 +0800
[fix](compaction) fix time series compaction merge empty rowsets priority
#34562 (#34765)
---
.../cumulative_compaction_time_series_policy.cpp | 39 +++++------
be/src/olap/delta_writer.cpp | 3 +-
...mulative_compaction_time_series_policy_test.cpp | 80 ++++++++++++++++++++++
3 files changed, 101 insertions(+), 21 deletions(-)
diff --git a/be/src/olap/cumulative_compaction_time_series_policy.cpp
b/be/src/olap/cumulative_compaction_time_series_policy.cpp
index 6c3f949723a..3134364a4dd 100644
--- a/be/src/olap/cumulative_compaction_time_series_policy.cpp
+++ b/be/src/olap/cumulative_compaction_time_series_policy.cpp
@@ -73,13 +73,6 @@ uint32_t
TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
return 0;
}
- // If there is a continuous set of empty rowsets, prioritize merging.
- auto consecutive_empty_rowsets =
tablet->pick_first_consecutive_empty_rowsets(
-
tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold());
- if (!consecutive_empty_rowsets.empty()) {
- return score;
- }
-
// Condition 1: the size of input files for compaction meets the
requirement of parameter compaction_goal_size
int64_t compaction_goal_size_mbytes =
tablet->tablet_meta()->time_series_compaction_goal_size_mbytes();
@@ -126,6 +119,13 @@ uint32_t
TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
tablet->set_last_cumu_compaction_success_time(now);
}
+ // Condition 5: If there is a continuous set of empty rowsets, prioritize
merging.
+ auto consecutive_empty_rowsets =
tablet->pick_first_consecutive_empty_rowsets(
+
tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold());
+ if (!consecutive_empty_rowsets.empty()) {
+ return score;
+ }
+
return 0;
}
@@ -215,19 +215,6 @@ int
TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
return 0;
}
- // If their are many empty rowsets, maybe should be compacted
- auto consecutive_empty_rowsets =
tablet->pick_first_consecutive_empty_rowsets(
-
tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold());
- if (!consecutive_empty_rowsets.empty()) {
- VLOG_NOTICE << "tablet is " << tablet->tablet_id()
- << ", there are too many consecutive empty rowsets, size
is "
- << consecutive_empty_rowsets.size();
- input_rowsets->clear();
- input_rowsets->insert(input_rowsets->end(),
consecutive_empty_rowsets.begin(),
- consecutive_empty_rowsets.end());
- return 0;
- }
-
int64_t compaction_goal_size_mbytes =
tablet->tablet_meta()->time_series_compaction_goal_size_mbytes();
@@ -338,6 +325,18 @@ int
TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
}
input_rowsets->clear();
+ // Condition 5: If their are many empty rowsets, maybe should be compacted
+ auto consecutive_empty_rowsets =
tablet->pick_first_consecutive_empty_rowsets(
+
tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold());
+ if (!consecutive_empty_rowsets.empty()) {
+ VLOG_NOTICE << "tablet is " << tablet->tablet_id()
+ << ", there are too many consecutive empty rowsets, size
is "
+ << consecutive_empty_rowsets.size();
+ input_rowsets->clear();
+ input_rowsets->insert(input_rowsets->end(),
consecutive_empty_rowsets.begin(),
+ consecutive_empty_rowsets.end());
+ return 0;
+ }
*compaction_score = 0;
return 0;
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 1ec4dd17313..bf7adadb943 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -112,7 +112,8 @@ Status BaseDeltaWriter::init() {
return Status::OK();
}
-Status BaseDeltaWriter::write(const vectorized::Block* block, const
std::vector<uint32_t>& row_idxs) {
+Status BaseDeltaWriter::write(const vectorized::Block* block,
+ const std::vector<uint32_t>& row_idxs) {
if (UNLIKELY(row_idxs.empty())) {
return Status::OK();
}
diff --git a/be/test/olap/cumulative_compaction_time_series_policy_test.cpp
b/be/test/olap/cumulative_compaction_time_series_policy_test.cpp
index 1390f1deb14..57872083edf 100644
--- a/be/test/olap/cumulative_compaction_time_series_policy_test.cpp
+++ b/be/test/olap/cumulative_compaction_time_series_policy_test.cpp
@@ -214,6 +214,58 @@ public:
rs_metas->push_back(ptr5);
}
+ void
init_all_rs_meta_empty_nonoverlapping(std::vector<RowsetMetaSharedPtr>*
rs_metas) {
+ RowsetMetaSharedPtr ptr1(new RowsetMeta());
+ init_rs_meta(ptr1, 0, 1);
+ ptr1->set_total_disk_size(1 * 1024);
+ rs_metas->push_back(ptr1);
+
+ RowsetMetaSharedPtr ptr2(new RowsetMeta());
+ init_rs_meta(ptr2, 2, 3);
+ ptr2->set_total_disk_size(2 * 1024);
+ rs_metas->push_back(ptr2);
+
+ RowsetMetaSharedPtr ptr3(new RowsetMeta());
+ init_rs_meta(ptr3, 4, 4);
+ ptr3->set_num_segments(0);
+ rs_metas->push_back(ptr3);
+
+ RowsetMetaSharedPtr ptr4(new RowsetMeta());
+ init_rs_meta(ptr4, 5, 5);
+ ptr4->set_num_segments(0);
+ rs_metas->push_back(ptr4);
+
+ RowsetMetaSharedPtr ptr5(new RowsetMeta());
+ init_rs_meta(ptr5, 6, 6);
+ ptr5->set_num_segments(0);
+ rs_metas->push_back(ptr5);
+
+ RowsetMetaSharedPtr ptr6(new RowsetMeta());
+ init_rs_meta(ptr6, 7, 7);
+ ptr6->set_num_segments(0);
+ rs_metas->push_back(ptr6);
+
+ RowsetMetaSharedPtr ptr7(new RowsetMeta());
+ init_rs_meta(ptr7, 8, 8);
+ ptr7->set_num_segments(0);
+ rs_metas->push_back(ptr7);
+
+ RowsetMetaSharedPtr ptr8(new RowsetMeta());
+ init_rs_meta(ptr8, 9, 9);
+ ptr8->set_num_segments(0);
+ rs_metas->push_back(ptr8);
+
+ RowsetMetaSharedPtr ptr9(new RowsetMeta());
+ init_rs_meta(ptr9, 10, 10);
+ ptr9->set_num_segments(0);
+ rs_metas->push_back(ptr9);
+
+ RowsetMetaSharedPtr ptr10(new RowsetMeta());
+ init_rs_meta(ptr10, 11, 11);
+ ptr10->set_total_disk_size(2 * 1024);
+ rs_metas->push_back(ptr10);
+ }
+
void init_rs_meta_pick_empty(std::vector<RowsetMetaSharedPtr>* rs_metas) {
RowsetMetaSharedPtr ptr1(new RowsetMeta());
init_rs_meta(ptr1, 0, 1);
@@ -597,6 +649,34 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy,
_pick_missing_version_cumulativ
static_cast<void>(compaction.find_longest_consecutive_version(&rowsets3,
nullptr));
EXPECT_EQ(0, rowsets3.size());
}
+
+TEST_F(TestTimeSeriesCumulativeCompactionPolicy, pick_empty_rowsets) {
+ std::vector<RowsetMetaSharedPtr> rs_metas;
+ init_all_rs_meta_empty_nonoverlapping(&rs_metas);
+
+ for (auto& rowset : rs_metas) {
+ static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+ }
+
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr,
CUMULATIVE_TIME_SERIES_POLICY));
+ static_cast<void>(_tablet->init());
+ _tablet->calculate_cumulative_point();
+
+ auto candidate_rowsets =
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+ std::vector<RowsetSharedPtr> input_rowsets;
+ Version last_delete_version {-1, -1};
+ size_t compaction_score = 0;
+
+ _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+ _tablet.get(), candidate_rowsets, 10, 5, &input_rowsets,
&last_delete_version,
+ &compaction_score, config::enable_delete_when_cumu_compaction);
+
+ EXPECT_EQ(7, input_rowsets.size());
+ EXPECT_EQ(-1, last_delete_version.first);
+ EXPECT_EQ(-1, last_delete_version.second);
+}
} // namespace doris
// @brief Test Stub
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]