This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 7e91e69eb91 [fix](compaction) fix single compaction (#33907)
7e91e69eb91 is described below
commit 7e91e69eb91f6a25bc557a9dc2c7cb970b90616a
Author: Sun Chenyang <[email protected]>
AuthorDate: Fri Apr 19 23:30:25 2024 +0800
[fix](compaction) fix single compaction (#33907)
* [fix](compaction)Fix single compaction to get all local versions #33849
add test and comment
* remove single replica compaction prepare input rowsets
reviesd
---
be/src/olap/olap_server.cpp | 2 +-
be/src/olap/single_replica_compaction.cpp | 61 +++++--------
be/src/olap/single_replica_compaction.h | 2 +-
be/src/olap/tablet.cpp | 35 +++----
be/src/olap/tablet.h | 5 +-
be/test/olap/single_compaction_test.cpp | 147 ++++++++++++++++++++++++++++++
be/test/olap/tablet_test.cpp | 36 ++++++++
7 files changed, 225 insertions(+), 63 deletions(-)
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index a2b5bf56de9..ce805b842dd 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -800,7 +800,7 @@ void StorageEngine::get_tablet_rowset_versions(const
PGetTabletVersionsRequest*
response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
return;
}
- std::vector<Version> local_versions = tablet->get_all_versions();
+ std::vector<Version> local_versions = tablet->get_all_local_versions();
for (const auto& local_version : local_versions) {
auto version = response->add_versions();
version->set_first(local_version.first);
diff --git a/be/src/olap/single_replica_compaction.cpp
b/be/src/olap/single_replica_compaction.cpp
index 038c3893497..793f9d497c6 100644
--- a/be/src/olap/single_replica_compaction.cpp
+++ b/be/src/olap/single_replica_compaction.cpp
@@ -56,37 +56,12 @@ Status SingleReplicaCompaction::prepare_compact() {
return Status::Error<CUMULATIVE_INVALID_PARAMETERS, false>("_tablet
init failed");
}
- std::unique_lock<std::mutex>
lock_cumu(_tablet->get_cumulative_compaction_lock(),
- std::try_to_lock);
- if (!lock_cumu.owns_lock()) {
- return Status::Error<TRY_LOCK_FAILED, false>(
- "The tablet is under cumulative compaction. tablet={}",
_tablet->tablet_id());
- }
- std::unique_lock<std::mutex>
lock_base(_tablet->get_base_compaction_lock(), std::try_to_lock);
- if (!lock_base.owns_lock()) {
- return Status::Error<TRY_LOCK_FAILED, false>(
- "another base compaction is running. tablet={}",
_tablet->tablet_id());
- }
-
- // 1. pick rowsets to compact
- RETURN_IF_ERROR(pick_rowsets_to_compact());
- if (_input_rowsets.size() == 1) {
- return
Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("_input_rowsets.size() is 1");
- }
-
+ // Single replica compaction does not require picking _input_rowsets
+ // _input_rowsets depends on the fetched _output_version
return Status::OK();
}
Status SingleReplicaCompaction::pick_rowsets_to_compact() {
- auto candidate_rowsets =
_tablet->pick_candidate_rowsets_to_single_replica_compaction();
- if (candidate_rowsets.empty()) {
- return
Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("candidate_rowsets is empty");
- }
- _input_rowsets.clear();
- for (const auto& rowset : candidate_rowsets) {
- _input_rowsets.emplace_back(rowset);
- }
-
return Status::OK();
}
@@ -106,10 +81,9 @@ Status SingleReplicaCompaction::execute_compact_impl() {
SCOPED_ATTACH_TASK(_mem_tracker);
- // 2. do single replica compaction
+ // do single replica compaction
RETURN_IF_ERROR(_do_single_replica_compaction());
- // 3. set state to success
_state = CompactionState::SUCCESS;
return Status::OK();
@@ -217,10 +191,8 @@ Status
SingleReplicaCompaction::_get_rowset_verisons_from_peer(
bool SingleReplicaCompaction::_find_rowset_to_fetch(const
std::vector<Version>& peer_versions,
Version* proper_version) {
- // get local versions
- std::vector<Version> local_versions = _tablet->get_all_versions();
- std::sort(local_versions.begin(), local_versions.end(),
- [](const Version& left, const Version& right) { return
left.first < right.first; });
+ // already sorted
+ std::vector<Version> local_versions = _tablet->get_all_local_versions();
for (const auto& v : local_versions) {
VLOG_CRITICAL << _tablet->tablet_id() << " tablet local version: " <<
v.first << " - "
<< v.second;
@@ -271,13 +243,24 @@ bool SingleReplicaCompaction::_find_rowset_to_fetch(const
std::vector<Version>&
}
if (find) {
// 4. reset input rowsets
- auto rs_iter = _input_rowsets.begin();
- while (rs_iter != _input_rowsets.end()) {
- if ((*proper_version).contains((*rs_iter)->version())) {
- ++rs_iter;
- continue;
+ _input_rowsets.clear();
+ _tablet->traverse_rowsets([this, &proper_version](const auto& rs) {
+ // only need rowset in proper_version
+ if (rs->is_local() && proper_version->contains(rs->version())) {
+ this->_input_rowsets.emplace_back(rs);
}
- rs_iter = _input_rowsets.erase(rs_iter);
+ });
+ std::sort(_input_rowsets.begin(), _input_rowsets.end(),
Rowset::comparator);
+ DCHECK_EQ(_input_rowsets.front()->start_version(),
proper_version->first);
+ DCHECK_EQ(_input_rowsets.back()->end_version(),
proper_version->second);
+ if (_input_rowsets.front()->start_version() != proper_version->first ||
+ _input_rowsets.back()->end_version() != proper_version->second) {
+ LOG(WARNING) << fmt::format(
+ "single compaction input rowsets error, tablet_id={},
input rowset = [{}-{}], "
+ "remote rowset = {}",
+ _tablet->tablet_id(),
_input_rowsets.front()->start_version(),
+ _input_rowsets.back()->end_version(),
proper_version->to_string());
+ return false;
}
for (auto& rowset : _input_rowsets) {
_input_rowsets_size += rowset->data_disk_size();
diff --git a/be/src/olap/single_replica_compaction.h
b/be/src/olap/single_replica_compaction.h
index fd35da12645..788ccd35cc9 100644
--- a/be/src/olap/single_replica_compaction.h
+++ b/be/src/olap/single_replica_compaction.h
@@ -48,7 +48,7 @@ protected:
private:
Status _do_single_replica_compaction();
Status _do_single_replica_compaction_impl();
- bool _find_rowset_to_fetch(const std::vector<Version>& peer_versions,
Version* peer_version);
+ bool _find_rowset_to_fetch(const std::vector<Version>& peer_versions,
Version* proper_version);
Status _get_rowset_verisons_from_peer(const TReplicaInfo& addr,
std::vector<Version>* peer_versions);
Status _fetch_rowset(const TReplicaInfo& addr, const std::string& token,
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index e6f421d5ebe..b6d347e27e9 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1297,20 +1297,6 @@ std::vector<RowsetSharedPtr>
Tablet::pick_candidate_rowsets_to_cumulative_compac
return candidate_rowsets;
}
-std::vector<RowsetSharedPtr>
Tablet::pick_candidate_rowsets_to_single_replica_compaction() {
- std::vector<RowsetSharedPtr> candidate_rowsets;
- {
- std::shared_lock rlock(_meta_lock);
- for (const auto& [version, rs] : _rs_version_map) {
- if (rs->is_local()) {
- candidate_rowsets.push_back(rs);
- }
- }
- }
- std::sort(candidate_rowsets.begin(), candidate_rowsets.end(),
Rowset::comparator);
- return candidate_rowsets;
-}
-
std::vector<RowsetSharedPtr>
Tablet::pick_candidate_rowsets_to_base_compaction() {
std::vector<RowsetSharedPtr> candidate_rowsets;
{
@@ -1327,7 +1313,15 @@ std::vector<RowsetSharedPtr>
Tablet::pick_candidate_rowsets_to_base_compaction()
}
std::vector<RowsetSharedPtr>
Tablet::pick_candidate_rowsets_to_full_compaction() {
- return pick_candidate_rowsets_to_single_replica_compaction();
+ std::vector<RowsetSharedPtr> candidate_rowsets;
+ traverse_rowsets([&candidate_rowsets](const auto& rs) {
+ // Do full compaction on all local rowsets.
+ if (rs->is_local()) {
+ candidate_rowsets.emplace_back(rs);
+ }
+ });
+ std::sort(candidate_rowsets.begin(), candidate_rowsets.end(),
Rowset::comparator);
+ return candidate_rowsets;
}
std::vector<RowsetSharedPtr> Tablet::pick_first_consecutive_empty_rowsets(int
limit) {
@@ -1895,13 +1889,14 @@ void
Tablet::execute_single_replica_compaction(SingleReplicaCompaction& compacti
set_last_failure_time(this, compaction, 0);
}
-std::vector<Version> Tablet::get_all_versions() {
+std::vector<Version> Tablet::get_all_local_versions() {
std::vector<Version> local_versions;
{
- std::lock_guard<std::shared_mutex> wrlock(_meta_lock);
- SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
- for (const auto& it : _rs_version_map) {
- local_versions.emplace_back(it.first);
+ std::shared_lock rlock(_meta_lock);
+ for (const auto& [version, rs] : _rs_version_map) {
+ if (rs->is_local()) {
+ local_versions.emplace_back(version);
+ }
}
}
std::sort(local_versions.begin(), local_versions.end(),
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 8b28b57897d..0f4ac7d4d5e 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -286,8 +286,9 @@ public:
std::vector<RowsetSharedPtr>
pick_candidate_rowsets_to_build_inverted_index(
const std::set<int32_t>& alter_index_uids, bool is_drop_op);
- std::vector<RowsetSharedPtr>
pick_candidate_rowsets_to_single_replica_compaction();
- std::vector<Version> get_all_versions();
+ // used for single compaction to get the local versions
+ // Single compaction does not require remote rowsets and cannot violate
the cooldown semantics
+ std::vector<Version> get_all_local_versions();
std::vector<RowsetSharedPtr> pick_first_consecutive_empty_rowsets(int
limit);
diff --git a/be/test/olap/single_compaction_test.cpp
b/be/test/olap/single_compaction_test.cpp
new file mode 100644
index 00000000000..d909a6668be
--- /dev/null
+++ b/be/test/olap/single_compaction_test.cpp
@@ -0,0 +1,147 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+
+#include <memory>
+#include <ostream>
+#include <string>
+#include <vector>
+
+#include "common/status.h"
+#include "gtest/gtest_pred_impl.h"
+#include "olap/data_dir.h"
+#include "olap/olap_common.h"
+#include "olap/rowset/beta_rowset.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/single_replica_compaction.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
+namespace doris {
+
+static StorageEngine* engine_ref = nullptr;
+
+class SingleCompactionTest : public ::testing::Test {
+protected:
+ void SetUp() override {
+ const std::string dir_path = "ut_dir/single_compact_test";
+ _engine = new StorageEngine({});
+ _data_dir = new DataDir(*_engine, dir_path);
+ engine_ref = _engine;
+ }
+
+ TabletSharedPtr create_tablet(int64_t tablet_id) {
+ auto tablet_meta = std::make_shared<TabletMeta>();
+ tablet_meta->_tablet_id = tablet_id;
+ (void)tablet_meta->set_partition_id(10000);
+ tablet_meta->set_tablet_uid({tablet_id, 0});
+ tablet_meta->set_shard_id(tablet_id % 4);
+ tablet_meta->_schema_hash = tablet_id;
+ return std::make_shared<Tablet>(*_engine, std::move(tablet_meta),
_data_dir);
+ }
+ auto create_rowset(TabletSharedPtr tablet, int64_t start, int64 end) {
+ auto rowset_meta = std::make_shared<RowsetMeta>();
+ Version version(start, end);
+ rowset_meta->set_version(version);
+ rowset_meta->set_tablet_id(tablet->tablet_id());
+ rowset_meta->set_tablet_uid(tablet->tablet_uid());
+ rowset_meta->set_rowset_id(_engine->next_rowset_id());
+ return std::make_shared<BetaRowset>(tablet->tablet_schema(),
tablet->tablet_path(),
+ std::move(rowset_meta));
+ }
+ void TearDown() override {
+ delete _engine;
+ delete _data_dir;
+ }
+
+private:
+ StorageEngine* _engine;
+ DataDir* _data_dir;
+};
+
+TEST_F(SingleCompactionTest, test_single) {
+ TabletSharedPtr tablet = create_tablet(10001);
+
+ SingleReplicaCompaction single_compaction(*engine_ref, tablet,
+
CompactionType::CUMULATIVE_COMPACTION);
+ auto st = tablet->init();
+ ASSERT_TRUE(st.ok()) << st;
+ // load 30 rowsets
+ for (int i = 1; i <= 30; ++i) {
+ auto rs = create_rowset(tablet, i, i);
+ st = tablet->add_inc_rowset(rs);
+ ASSERT_TRUE(st.ok()) << st;
+ }
+
+ // pick input rowsets, but picking is not needed now
+ st = single_compaction.prepare_compact();
+ ASSERT_TRUE(st.ok()) << st;
+
+ // load 2 rowsets
+ for (int i = 31; i <= 32; i++) {
+ auto rs = create_rowset(tablet, i, i);
+ st = tablet->add_inc_rowset(rs);
+ ASSERT_TRUE(st.ok()) << st;
+ }
+
+ // create peer compacted rowset
+ auto v1 = Version(1, 32);
+ auto v2 = Version(33, 38);
+ std::vector<Version> peer_version {v1, v2};
+ Version proper_version;
+ bool find = single_compaction._find_rowset_to_fetch(peer_version,
&proper_version);
+ EXPECT_EQ(find, true);
+ EXPECT_EQ(single_compaction._input_rowsets.size(), 32);
+ EXPECT_EQ(single_compaction._input_rowsets.front()->start_version(),
+ single_compaction._output_version.first);
+ EXPECT_EQ(single_compaction._input_rowsets.back()->end_version(),
+ single_compaction._output_version.second);
+}
+
+TEST_F(SingleCompactionTest, test_unmatch) {
+ TabletSharedPtr tablet = create_tablet(10000);
+
+ SingleReplicaCompaction single_compaction(*engine_ref, tablet,
+
CompactionType::CUMULATIVE_COMPACTION);
+ auto st = tablet->init();
+ ASSERT_TRUE(st.ok()) << st;
+ // local rowset [4-6]
+ auto rs = create_rowset(tablet, 4, 6);
+ st = tablet->add_inc_rowset(rs);
+ ASSERT_TRUE(st.ok()) << st;
+
+ // pick input rowsets, but picking is not needed now
+ st = single_compaction.prepare_compact();
+ ASSERT_TRUE(st.ok()) << st;
+
+ // create peer compacted rowset [3-5], [6-9]
+ auto v1 = Version(3, 5);
+ auto v2 = Version(6, 9);
+ std::vector<Version> peer_version {v1, v2};
+ Version proper_version;
+ bool find = single_compaction._find_rowset_to_fetch(peer_version,
&proper_version);
+ EXPECT_EQ(find, false); // no matched version, find = false
+ EXPECT_EQ(single_compaction._input_rowsets.size(), 0);
+ EXPECT_EQ(single_compaction._output_version.first, 0);
+ EXPECT_EQ(single_compaction._output_version.second, 0);
+}
+
+} // namespace doris
diff --git a/be/test/olap/tablet_test.cpp b/be/test/olap/tablet_test.cpp
index f5778d47982..8d84b5141c3 100644
--- a/be/test/olap/tablet_test.cpp
+++ b/be/test/olap/tablet_test.cpp
@@ -148,6 +148,20 @@ public:
pb1->set_tablet_schema(_tablet_meta->tablet_schema());
}
+ void init_rs_meta_resource(RowsetMetaSharedPtr& pb1, int64_t start,
int64_t end,
+ bool is_local) {
+ RowsetMetaPB rowset_meta_pb;
+ json2pb::JsonToProtoMessage(_json_rowset_meta, &rowset_meta_pb);
+ rowset_meta_pb.set_start_version(start);
+ rowset_meta_pb.set_end_version(end);
+ rowset_meta_pb.set_creation_time(10000);
+ if (!is_local) {
+ rowset_meta_pb.set_resource_id("100");
+ }
+ pb1->init_from_pb(rowset_meta_pb);
+ pb1->set_tablet_schema(_tablet_meta->tablet_schema());
+ }
+
void init_all_rs_meta(std::vector<RowsetMetaSharedPtr>* rs_metas) {
RowsetMetaSharedPtr ptr1(new RowsetMeta());
init_rs_meta(ptr1, 0, 0);
@@ -398,4 +412,26 @@ TEST_F(TestTablet, cooldown_policy) {
}
}
+TEST_F(TestTablet, get_local_versions) {
+ // 10 remote rowsets
+ for (int i = 1; i <= 10; i++) {
+ auto ptr = std::make_shared<RowsetMeta>();
+ init_rs_meta_resource(ptr, i, i, false);
+ static_cast<void>(_tablet_meta->add_rs_meta(ptr));
+ }
+
+ // 20 local rowsets
+ for (int i = 11; i <= 30; i++) {
+ auto ptr = std::make_shared<RowsetMeta>();
+ init_rs_meta_resource(ptr, i, i, true);
+ static_cast<void>(_tablet_meta->add_rs_meta(ptr));
+ }
+
+ static_cast<void>(_data_dir->init());
+ TabletSharedPtr _tablet(new Tablet(*k_engine, _tablet_meta,
_data_dir.get()));
+ static_cast<void>(_tablet->init());
+ const auto& local_versions = _tablet->get_all_local_versions();
+ ASSERT_EQ(local_versions.size(), 20);
+}
+
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]