This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 110a7072ed4 [fix](compaction) remove single replica compaction prepare input rowsets #33053 (#33478) 110a7072ed4 is described below commit 110a7072ed4655a39e60cd664d660a0147546058 Author: Sun Chenyang <csun5...@gmail.com> AuthorDate: Thu Apr 11 09:49:37 2024 +0800 [fix](compaction) remove single replica compaction prepare input rowsets #33053 (#33478) --- be/src/olap/single_replica_compaction.cpp | 61 +++++-------- be/src/olap/single_replica_compaction.h | 2 +- be/src/olap/tablet.cpp | 24 ++--- be/src/olap/tablet.h | 1 - be/test/olap/single_compaction_test.cpp | 142 ++++++++++++++++++++++++++++++ 5 files changed, 173 insertions(+), 57 deletions(-) diff --git a/be/src/olap/single_replica_compaction.cpp b/be/src/olap/single_replica_compaction.cpp index 1bbdf95be55..ddd6e08bfa2 100644 --- a/be/src/olap/single_replica_compaction.cpp +++ b/be/src/olap/single_replica_compaction.cpp @@ -56,39 +56,12 @@ Status SingleReplicaCompaction::prepare_compact() { return Status::Error<CUMULATIVE_INVALID_PARAMETERS, false>("_tablet init failed"); } - std::unique_lock<std::mutex> lock_cumu(_tablet->get_cumulative_compaction_lock(), - std::try_to_lock); - if (!lock_cumu.owns_lock()) { - LOG(INFO) << "The tablet is under cumulative compaction. tablet=" << _tablet->full_name(); - return Status::Error<TRY_LOCK_FAILED, false>( - "The tablet is under cumulative compaction. tablet={}", _tablet->full_name()); - } - std::unique_lock<std::mutex> lock_base(_tablet->get_base_compaction_lock(), std::try_to_lock); - if (!lock_base.owns_lock()) { - LOG(WARNING) << "another base compaction is running. tablet=" << _tablet->full_name(); - return Status::Error<TRY_LOCK_FAILED, false>( - "another base compaction is running. tablet={}", _tablet->full_name()); - } - - // 1. pick rowsets to compact - RETURN_IF_ERROR(pick_rowsets_to_compact()); - if (_input_rowsets.size() == 1) { - return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("_input_rowsets.size() is 1"); - } - + // Single replica compaction does not require picking _input_rowsets + // _input_rowsets depends on the fetched _output_version return Status::OK(); } Status SingleReplicaCompaction::pick_rowsets_to_compact() { - auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_single_replica_compaction(); - if (candidate_rowsets.empty()) { - return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("candidate_rowsets is empty"); - } - _input_rowsets.clear(); - for (const auto& rowset : candidate_rowsets) { - _input_rowsets.emplace_back(rowset); - } - return Status::OK(); } @@ -109,10 +82,9 @@ Status SingleReplicaCompaction::execute_compact_impl() { SCOPED_ATTACH_TASK(_mem_tracker); - // 2. do single replica compaction + // do single replica compaction RETURN_IF_ERROR(_do_single_replica_compaction()); - // 3. set state to success _state = CompactionState::SUCCESS; return Status::OK(); @@ -220,10 +192,8 @@ Status SingleReplicaCompaction::_get_rowset_verisons_from_peer( bool SingleReplicaCompaction::_find_rowset_to_fetch(const std::vector<Version>& peer_versions, Version* proper_version) { - // get local versions + // already sorted std::vector<Version> local_versions = _tablet->get_all_versions(); - std::sort(local_versions.begin(), local_versions.end(), - [](const Version& left, const Version& right) { return left.first < right.first; }); for (const auto& v : local_versions) { VLOG_CRITICAL << _tablet->tablet_id() << " tablet local version: " << v.first << " - " << v.second; @@ -274,13 +244,24 @@ bool SingleReplicaCompaction::_find_rowset_to_fetch(const std::vector<Version>& } if (find) { // 4. reset input rowsets - auto rs_iter = _input_rowsets.begin(); - while (rs_iter != _input_rowsets.end()) { - if ((*proper_version).contains((*rs_iter)->version())) { - ++rs_iter; - continue; + _input_rowsets.clear(); + _tablet->traverse_rowsets([this, &proper_version](const auto& rs) { + // only need rowset in proper_version + if (rs->is_local() && proper_version->contains(rs->version())) { + this->_input_rowsets.emplace_back(rs); } - rs_iter = _input_rowsets.erase(rs_iter); + }); + std::sort(_input_rowsets.begin(), _input_rowsets.end(), Rowset::comparator); + DCHECK_EQ(_input_rowsets.front()->start_version(), proper_version->first); + DCHECK_EQ(_input_rowsets.back()->end_version(), proper_version->second); + if (_input_rowsets.front()->start_version() != proper_version->first || + _input_rowsets.back()->end_version() != proper_version->second) { + LOG(WARNING) << fmt::format( + "single compaction input rowsets error, tablet_id={}, input rowset = [{}-{}], " + "remote rowset = {}", + _tablet->tablet_id(), _input_rowsets.front()->start_version(), + _input_rowsets.back()->end_version(), proper_version->to_string()); + return false; } for (auto& rowset : _input_rowsets) { _input_rowsets_size += rowset->data_disk_size(); diff --git a/be/src/olap/single_replica_compaction.h b/be/src/olap/single_replica_compaction.h index 3d967dff20b..ae013b3748d 100644 --- a/be/src/olap/single_replica_compaction.h +++ b/be/src/olap/single_replica_compaction.h @@ -47,7 +47,7 @@ protected: private: Status _do_single_replica_compaction(); Status _do_single_replica_compaction_impl(); - bool _find_rowset_to_fetch(const std::vector<Version>& peer_versions, Version* peer_version); + bool _find_rowset_to_fetch(const std::vector<Version>& peer_versions, Version* proper_version); Status _get_rowset_verisons_from_peer(const TReplicaInfo& addr, std::vector<Version>* peer_versions); Status _fetch_rowset(const TReplicaInfo& addr, const std::string& token, diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 9b745a92479..94aee96a260 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1334,20 +1334,6 @@ std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_cumulative_compac return candidate_rowsets; } -std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_single_replica_compaction() { - std::vector<RowsetSharedPtr> candidate_rowsets; - { - std::shared_lock rlock(_meta_lock); - for (const auto& [version, rs] : _rs_version_map) { - if (rs->is_local()) { - candidate_rowsets.push_back(rs); - } - } - } - std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator); - return candidate_rowsets; -} - std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_base_compaction() { std::vector<RowsetSharedPtr> candidate_rowsets; { @@ -1364,7 +1350,15 @@ std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_base_compaction() } std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_full_compaction() { - return pick_candidate_rowsets_to_single_replica_compaction(); + std::vector<RowsetSharedPtr> candidate_rowsets; + traverse_rowsets([&candidate_rowsets](const auto& rs) { + // Do full compaction on all local rowsets. + if (rs->is_local()) { + candidate_rowsets.emplace_back(rs); + } + }); + std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator); + return candidate_rowsets; } std::vector<RowsetSharedPtr> Tablet::pick_first_consecutive_empty_rowsets(int limit) { diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index d94990e4071..6201924e944 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -273,7 +273,6 @@ public: std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_build_inverted_index( const std::set<int32_t>& alter_index_uids, bool is_drop_op); - std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_single_replica_compaction(); std::vector<Version> get_all_versions(); std::vector<RowsetSharedPtr> pick_first_consecutive_empty_rowsets(int limit); diff --git a/be/test/olap/single_compaction_test.cpp b/be/test/olap/single_compaction_test.cpp new file mode 100644 index 00000000000..b190bf9e2a9 --- /dev/null +++ b/be/test/olap/single_compaction_test.cpp @@ -0,0 +1,142 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest-message.h> +#include <gtest/gtest-test-part.h> + +#include <memory> +#include <ostream> +#include <string> +#include <vector> + +#include "common/status.h" +#include "gtest/gtest_pred_impl.h" +#include "olap/data_dir.h" +#include "olap/olap_common.h" +#include "olap/rowset/beta_rowset.h" +#include "olap/rowset/rowset.h" +#include "olap/rowset/rowset_meta.h" +#include "olap/single_replica_compaction.h" +#include "olap/storage_engine.h" +#include "olap/tablet.h" +#include "olap/tablet_meta.h" +namespace doris { + +class SingleCompactionTest : public ::testing::Test { +protected: + void SetUp() override { + const std::string dir_path = "ut_dir/single_compact_test"; + _engine = new StorageEngine({}); + _data_dir = new DataDir(dir_path); + } + + TabletSharedPtr create_tablet(int64_t tablet_id) { + auto tablet_meta = std::make_shared<TabletMeta>(); + tablet_meta->_tablet_id = tablet_id; + (void)tablet_meta->set_partition_id(10000); + tablet_meta->set_tablet_uid({tablet_id, 0}); + tablet_meta->set_shard_id(tablet_id % 4); + tablet_meta->_schema_hash = tablet_id; + return std::make_shared<Tablet>(std::move(tablet_meta), _data_dir); + } + auto create_rowset(TabletSharedPtr tablet, int64_t start, int64 end) { + auto rowset_meta = std::make_shared<RowsetMeta>(); + Version version(start, end); + rowset_meta->set_version(version); + rowset_meta->set_tablet_id(tablet->tablet_id()); + rowset_meta->set_tablet_uid(tablet->tablet_uid()); + rowset_meta->set_rowset_id(_engine->next_rowset_id()); + return std::make_shared<BetaRowset>(tablet->tablet_schema(), tablet->tablet_path(), + std::move(rowset_meta)); + } + void TearDown() override { + delete _engine; + delete _data_dir; + } + +private: + StorageEngine* _engine; + DataDir* _data_dir; +}; + +TEST_F(SingleCompactionTest, test_single) { + TabletSharedPtr tablet = create_tablet(10001); + + SingleReplicaCompaction single_compaction(tablet, CompactionType::CUMULATIVE_COMPACTION); + auto st = tablet->init(); + ASSERT_TRUE(st.ok()) << st; + // load 30 rowsets + for (int i = 1; i <= 30; ++i) { + auto rs = create_rowset(tablet, i, i); + st = tablet->add_inc_rowset(rs); + ASSERT_TRUE(st.ok()) << st; + } + + // pick input rowsets, but picking is not needed now + st = single_compaction.prepare_compact(); + ASSERT_TRUE(st.ok()) << st; + + // load 2 rowsets + for (int i = 31; i <= 32; i++) { + auto rs = create_rowset(tablet, i, i); + st = tablet->add_inc_rowset(rs); + ASSERT_TRUE(st.ok()) << st; + } + + // create peer compacted rowset + auto v1 = Version(1, 32); + auto v2 = Version(33, 38); + std::vector<Version> peer_version {v1, v2}; + Version proper_version; + bool find = single_compaction._find_rowset_to_fetch(peer_version, &proper_version); + EXPECT_EQ(find, true); + EXPECT_EQ(single_compaction._input_rowsets.size(), 32); + EXPECT_EQ(single_compaction._input_rowsets.front()->start_version(), + single_compaction._output_version.first); + EXPECT_EQ(single_compaction._input_rowsets.back()->end_version(), + single_compaction._output_version.second); +} + +TEST_F(SingleCompactionTest, test_unmatch) { + TabletSharedPtr tablet = create_tablet(10000); + + SingleReplicaCompaction single_compaction(tablet, CompactionType::CUMULATIVE_COMPACTION); + auto st = tablet->init(); + ASSERT_TRUE(st.ok()) << st; + // local rowset [4-6] + auto rs = create_rowset(tablet, 4, 6); + st = tablet->add_inc_rowset(rs); + ASSERT_TRUE(st.ok()) << st; + + // pick input rowsets, but picking is not needed now + st = single_compaction.prepare_compact(); + ASSERT_TRUE(st.ok()) << st; + + // create peer compacted rowset [3-5], [6-9] + auto v1 = Version(3, 5); + auto v2 = Version(6, 9); + std::vector<Version> peer_version {v1, v2}; + Version proper_version; + bool find = single_compaction._find_rowset_to_fetch(peer_version, &proper_version); + EXPECT_EQ(find, false); // no matched version, find = false + EXPECT_EQ(single_compaction._input_rowsets.size(), 0); + EXPECT_EQ(single_compaction._output_version.first, 0); + EXPECT_EQ(single_compaction._output_version.second, 0); +} + +} // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org