This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 110a7072ed4 [fix](compaction) remove single replica compaction
prepare input rowsets #33053 (#33478)
110a7072ed4 is described below
commit 110a7072ed4655a39e60cd664d660a0147546058
Author: Sun Chenyang <[email protected]>
AuthorDate: Thu Apr 11 09:49:37 2024 +0800
[fix](compaction) remove single replica compaction prepare input rowsets
#33053 (#33478)
---
be/src/olap/single_replica_compaction.cpp | 61 +++++--------
be/src/olap/single_replica_compaction.h | 2 +-
be/src/olap/tablet.cpp | 24 ++---
be/src/olap/tablet.h | 1 -
be/test/olap/single_compaction_test.cpp | 142 ++++++++++++++++++++++++++++++
5 files changed, 173 insertions(+), 57 deletions(-)
diff --git a/be/src/olap/single_replica_compaction.cpp
b/be/src/olap/single_replica_compaction.cpp
index 1bbdf95be55..ddd6e08bfa2 100644
--- a/be/src/olap/single_replica_compaction.cpp
+++ b/be/src/olap/single_replica_compaction.cpp
@@ -56,39 +56,12 @@ Status SingleReplicaCompaction::prepare_compact() {
return Status::Error<CUMULATIVE_INVALID_PARAMETERS, false>("_tablet
init failed");
}
- std::unique_lock<std::mutex>
lock_cumu(_tablet->get_cumulative_compaction_lock(),
- std::try_to_lock);
- if (!lock_cumu.owns_lock()) {
- LOG(INFO) << "The tablet is under cumulative compaction. tablet=" <<
_tablet->full_name();
- return Status::Error<TRY_LOCK_FAILED, false>(
- "The tablet is under cumulative compaction. tablet={}",
_tablet->full_name());
- }
- std::unique_lock<std::mutex>
lock_base(_tablet->get_base_compaction_lock(), std::try_to_lock);
- if (!lock_base.owns_lock()) {
- LOG(WARNING) << "another base compaction is running. tablet=" <<
_tablet->full_name();
- return Status::Error<TRY_LOCK_FAILED, false>(
- "another base compaction is running. tablet={}",
_tablet->full_name());
- }
-
- // 1. pick rowsets to compact
- RETURN_IF_ERROR(pick_rowsets_to_compact());
- if (_input_rowsets.size() == 1) {
- return
Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("_input_rowsets.size() is 1");
- }
-
+ // Single replica compaction does not require picking _input_rowsets
+ // _input_rowsets depends on the fetched _output_version
return Status::OK();
}
Status SingleReplicaCompaction::pick_rowsets_to_compact() {
- auto candidate_rowsets =
_tablet->pick_candidate_rowsets_to_single_replica_compaction();
- if (candidate_rowsets.empty()) {
- return
Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("candidate_rowsets is empty");
- }
- _input_rowsets.clear();
- for (const auto& rowset : candidate_rowsets) {
- _input_rowsets.emplace_back(rowset);
- }
-
return Status::OK();
}
@@ -109,10 +82,9 @@ Status SingleReplicaCompaction::execute_compact_impl() {
SCOPED_ATTACH_TASK(_mem_tracker);
- // 2. do single replica compaction
+ // do single replica compaction
RETURN_IF_ERROR(_do_single_replica_compaction());
- // 3. set state to success
_state = CompactionState::SUCCESS;
return Status::OK();
@@ -220,10 +192,8 @@ Status
SingleReplicaCompaction::_get_rowset_verisons_from_peer(
bool SingleReplicaCompaction::_find_rowset_to_fetch(const
std::vector<Version>& peer_versions,
Version* proper_version) {
- // get local versions
+ // already sorted
std::vector<Version> local_versions = _tablet->get_all_versions();
- std::sort(local_versions.begin(), local_versions.end(),
- [](const Version& left, const Version& right) { return
left.first < right.first; });
for (const auto& v : local_versions) {
VLOG_CRITICAL << _tablet->tablet_id() << " tablet local version: " <<
v.first << " - "
<< v.second;
@@ -274,13 +244,24 @@ bool SingleReplicaCompaction::_find_rowset_to_fetch(const
std::vector<Version>&
}
if (find) {
// 4. reset input rowsets
- auto rs_iter = _input_rowsets.begin();
- while (rs_iter != _input_rowsets.end()) {
- if ((*proper_version).contains((*rs_iter)->version())) {
- ++rs_iter;
- continue;
+ _input_rowsets.clear();
+ _tablet->traverse_rowsets([this, &proper_version](const auto& rs) {
+ // only need rowset in proper_version
+ if (rs->is_local() && proper_version->contains(rs->version())) {
+ this->_input_rowsets.emplace_back(rs);
}
- rs_iter = _input_rowsets.erase(rs_iter);
+ });
+ std::sort(_input_rowsets.begin(), _input_rowsets.end(),
Rowset::comparator);
+ DCHECK_EQ(_input_rowsets.front()->start_version(),
proper_version->first);
+ DCHECK_EQ(_input_rowsets.back()->end_version(),
proper_version->second);
+ if (_input_rowsets.front()->start_version() != proper_version->first ||
+ _input_rowsets.back()->end_version() != proper_version->second) {
+ LOG(WARNING) << fmt::format(
+ "single compaction input rowsets error, tablet_id={},
input rowset = [{}-{}], "
+ "remote rowset = {}",
+ _tablet->tablet_id(),
_input_rowsets.front()->start_version(),
+ _input_rowsets.back()->end_version(),
proper_version->to_string());
+ return false;
}
for (auto& rowset : _input_rowsets) {
_input_rowsets_size += rowset->data_disk_size();
diff --git a/be/src/olap/single_replica_compaction.h
b/be/src/olap/single_replica_compaction.h
index 3d967dff20b..ae013b3748d 100644
--- a/be/src/olap/single_replica_compaction.h
+++ b/be/src/olap/single_replica_compaction.h
@@ -47,7 +47,7 @@ protected:
private:
Status _do_single_replica_compaction();
Status _do_single_replica_compaction_impl();
- bool _find_rowset_to_fetch(const std::vector<Version>& peer_versions,
Version* peer_version);
+ bool _find_rowset_to_fetch(const std::vector<Version>& peer_versions,
Version* proper_version);
Status _get_rowset_verisons_from_peer(const TReplicaInfo& addr,
std::vector<Version>* peer_versions);
Status _fetch_rowset(const TReplicaInfo& addr, const std::string& token,
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 9b745a92479..94aee96a260 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1334,20 +1334,6 @@ std::vector<RowsetSharedPtr>
Tablet::pick_candidate_rowsets_to_cumulative_compac
return candidate_rowsets;
}
-std::vector<RowsetSharedPtr>
Tablet::pick_candidate_rowsets_to_single_replica_compaction() {
- std::vector<RowsetSharedPtr> candidate_rowsets;
- {
- std::shared_lock rlock(_meta_lock);
- for (const auto& [version, rs] : _rs_version_map) {
- if (rs->is_local()) {
- candidate_rowsets.push_back(rs);
- }
- }
- }
- std::sort(candidate_rowsets.begin(), candidate_rowsets.end(),
Rowset::comparator);
- return candidate_rowsets;
-}
-
std::vector<RowsetSharedPtr>
Tablet::pick_candidate_rowsets_to_base_compaction() {
std::vector<RowsetSharedPtr> candidate_rowsets;
{
@@ -1364,7 +1350,15 @@ std::vector<RowsetSharedPtr>
Tablet::pick_candidate_rowsets_to_base_compaction()
}
std::vector<RowsetSharedPtr>
Tablet::pick_candidate_rowsets_to_full_compaction() {
- return pick_candidate_rowsets_to_single_replica_compaction();
+ std::vector<RowsetSharedPtr> candidate_rowsets;
+ traverse_rowsets([&candidate_rowsets](const auto& rs) {
+ // Do full compaction on all local rowsets.
+ if (rs->is_local()) {
+ candidate_rowsets.emplace_back(rs);
+ }
+ });
+ std::sort(candidate_rowsets.begin(), candidate_rowsets.end(),
Rowset::comparator);
+ return candidate_rowsets;
}
std::vector<RowsetSharedPtr> Tablet::pick_first_consecutive_empty_rowsets(int
limit) {
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index d94990e4071..6201924e944 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -273,7 +273,6 @@ public:
std::vector<RowsetSharedPtr>
pick_candidate_rowsets_to_build_inverted_index(
const std::set<int32_t>& alter_index_uids, bool is_drop_op);
- std::vector<RowsetSharedPtr>
pick_candidate_rowsets_to_single_replica_compaction();
std::vector<Version> get_all_versions();
std::vector<RowsetSharedPtr> pick_first_consecutive_empty_rowsets(int
limit);
diff --git a/be/test/olap/single_compaction_test.cpp
b/be/test/olap/single_compaction_test.cpp
new file mode 100644
index 00000000000..b190bf9e2a9
--- /dev/null
+++ b/be/test/olap/single_compaction_test.cpp
@@ -0,0 +1,142 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+
+#include <memory>
+#include <ostream>
+#include <string>
+#include <vector>
+
+#include "common/status.h"
+#include "gtest/gtest_pred_impl.h"
+#include "olap/data_dir.h"
+#include "olap/olap_common.h"
+#include "olap/rowset/beta_rowset.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/single_replica_compaction.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
+namespace doris {
+
+class SingleCompactionTest : public ::testing::Test {
+protected:
+ void SetUp() override {
+ const std::string dir_path = "ut_dir/single_compact_test";
+ _engine = new StorageEngine({});
+ _data_dir = new DataDir(dir_path);
+ }
+
+ TabletSharedPtr create_tablet(int64_t tablet_id) {
+ auto tablet_meta = std::make_shared<TabletMeta>();
+ tablet_meta->_tablet_id = tablet_id;
+ (void)tablet_meta->set_partition_id(10000);
+ tablet_meta->set_tablet_uid({tablet_id, 0});
+ tablet_meta->set_shard_id(tablet_id % 4);
+ tablet_meta->_schema_hash = tablet_id;
+ return std::make_shared<Tablet>(std::move(tablet_meta), _data_dir);
+ }
+ auto create_rowset(TabletSharedPtr tablet, int64_t start, int64 end) {
+ auto rowset_meta = std::make_shared<RowsetMeta>();
+ Version version(start, end);
+ rowset_meta->set_version(version);
+ rowset_meta->set_tablet_id(tablet->tablet_id());
+ rowset_meta->set_tablet_uid(tablet->tablet_uid());
+ rowset_meta->set_rowset_id(_engine->next_rowset_id());
+ return std::make_shared<BetaRowset>(tablet->tablet_schema(),
tablet->tablet_path(),
+ std::move(rowset_meta));
+ }
+ void TearDown() override {
+ delete _engine;
+ delete _data_dir;
+ }
+
+private:
+ StorageEngine* _engine;
+ DataDir* _data_dir;
+};
+
+TEST_F(SingleCompactionTest, test_single) {
+ TabletSharedPtr tablet = create_tablet(10001);
+
+ SingleReplicaCompaction single_compaction(tablet,
CompactionType::CUMULATIVE_COMPACTION);
+ auto st = tablet->init();
+ ASSERT_TRUE(st.ok()) << st;
+ // load 30 rowsets
+ for (int i = 1; i <= 30; ++i) {
+ auto rs = create_rowset(tablet, i, i);
+ st = tablet->add_inc_rowset(rs);
+ ASSERT_TRUE(st.ok()) << st;
+ }
+
+ // pick input rowsets, but picking is not needed now
+ st = single_compaction.prepare_compact();
+ ASSERT_TRUE(st.ok()) << st;
+
+ // load 2 rowsets
+ for (int i = 31; i <= 32; i++) {
+ auto rs = create_rowset(tablet, i, i);
+ st = tablet->add_inc_rowset(rs);
+ ASSERT_TRUE(st.ok()) << st;
+ }
+
+ // create peer compacted rowset
+ auto v1 = Version(1, 32);
+ auto v2 = Version(33, 38);
+ std::vector<Version> peer_version {v1, v2};
+ Version proper_version;
+ bool find = single_compaction._find_rowset_to_fetch(peer_version,
&proper_version);
+ EXPECT_EQ(find, true);
+ EXPECT_EQ(single_compaction._input_rowsets.size(), 32);
+ EXPECT_EQ(single_compaction._input_rowsets.front()->start_version(),
+ single_compaction._output_version.first);
+ EXPECT_EQ(single_compaction._input_rowsets.back()->end_version(),
+ single_compaction._output_version.second);
+}
+
+TEST_F(SingleCompactionTest, test_unmatch) {
+ TabletSharedPtr tablet = create_tablet(10000);
+
+ SingleReplicaCompaction single_compaction(tablet,
CompactionType::CUMULATIVE_COMPACTION);
+ auto st = tablet->init();
+ ASSERT_TRUE(st.ok()) << st;
+ // local rowset [4-6]
+ auto rs = create_rowset(tablet, 4, 6);
+ st = tablet->add_inc_rowset(rs);
+ ASSERT_TRUE(st.ok()) << st;
+
+ // pick input rowsets, but picking is not needed now
+ st = single_compaction.prepare_compact();
+ ASSERT_TRUE(st.ok()) << st;
+
+ // create peer compacted rowset [3-5], [6-9]
+ auto v1 = Version(3, 5);
+ auto v2 = Version(6, 9);
+ std::vector<Version> peer_version {v1, v2};
+ Version proper_version;
+ bool find = single_compaction._find_rowset_to_fetch(peer_version,
&proper_version);
+ EXPECT_EQ(find, false); // no matched version, find = false
+ EXPECT_EQ(single_compaction._input_rowsets.size(), 0);
+ EXPECT_EQ(single_compaction._output_version.first, 0);
+ EXPECT_EQ(single_compaction._output_version.second, 0);
+}
+
+} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]