This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 110a7072ed4  [fix](compaction) remove single replica compaction 
prepare input rowsets #33053 (#33478)
110a7072ed4 is described below

commit 110a7072ed4655a39e60cd664d660a0147546058
Author: Sun Chenyang <csun5...@gmail.com>
AuthorDate: Thu Apr 11 09:49:37 2024 +0800

     [fix](compaction) remove single replica compaction prepare input rowsets 
#33053 (#33478)
---
 be/src/olap/single_replica_compaction.cpp |  61 +++++--------
 be/src/olap/single_replica_compaction.h   |   2 +-
 be/src/olap/tablet.cpp                    |  24 ++---
 be/src/olap/tablet.h                      |   1 -
 be/test/olap/single_compaction_test.cpp   | 142 ++++++++++++++++++++++++++++++
 5 files changed, 173 insertions(+), 57 deletions(-)

diff --git a/be/src/olap/single_replica_compaction.cpp 
b/be/src/olap/single_replica_compaction.cpp
index 1bbdf95be55..ddd6e08bfa2 100644
--- a/be/src/olap/single_replica_compaction.cpp
+++ b/be/src/olap/single_replica_compaction.cpp
@@ -56,39 +56,12 @@ Status SingleReplicaCompaction::prepare_compact() {
         return Status::Error<CUMULATIVE_INVALID_PARAMETERS, false>("_tablet 
init failed");
     }
 
-    std::unique_lock<std::mutex> 
lock_cumu(_tablet->get_cumulative_compaction_lock(),
-                                           std::try_to_lock);
-    if (!lock_cumu.owns_lock()) {
-        LOG(INFO) << "The tablet is under cumulative compaction. tablet=" << 
_tablet->full_name();
-        return Status::Error<TRY_LOCK_FAILED, false>(
-                "The tablet is under cumulative compaction. tablet={}", 
_tablet->full_name());
-    }
-    std::unique_lock<std::mutex> 
lock_base(_tablet->get_base_compaction_lock(), std::try_to_lock);
-    if (!lock_base.owns_lock()) {
-        LOG(WARNING) << "another base compaction is running. tablet=" << 
_tablet->full_name();
-        return Status::Error<TRY_LOCK_FAILED, false>(
-                "another base compaction is running. tablet={}", 
_tablet->full_name());
-    }
-
-    // 1. pick rowsets to compact
-    RETURN_IF_ERROR(pick_rowsets_to_compact());
-    if (_input_rowsets.size() == 1) {
-        return 
Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("_input_rowsets.size() is 1");
-    }
-
+    // Single replica compaction does not require picking _input_rowsets
+    // _input_rowsets depends on the fetched _output_version
     return Status::OK();
 }
 
 Status SingleReplicaCompaction::pick_rowsets_to_compact() {
-    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_single_replica_compaction();
-    if (candidate_rowsets.empty()) {
-        return 
Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("candidate_rowsets is empty");
-    }
-    _input_rowsets.clear();
-    for (const auto& rowset : candidate_rowsets) {
-        _input_rowsets.emplace_back(rowset);
-    }
-
     return Status::OK();
 }
 
@@ -109,10 +82,9 @@ Status SingleReplicaCompaction::execute_compact_impl() {
 
     SCOPED_ATTACH_TASK(_mem_tracker);
 
-    // 2. do single replica compaction
+    // do single replica compaction
     RETURN_IF_ERROR(_do_single_replica_compaction());
 
-    // 3. set state to success
     _state = CompactionState::SUCCESS;
 
     return Status::OK();
@@ -220,10 +192,8 @@ Status 
SingleReplicaCompaction::_get_rowset_verisons_from_peer(
 
 bool SingleReplicaCompaction::_find_rowset_to_fetch(const 
std::vector<Version>& peer_versions,
                                                     Version* proper_version) {
-    //  get local versions
+    //  already sorted
     std::vector<Version> local_versions = _tablet->get_all_versions();
-    std::sort(local_versions.begin(), local_versions.end(),
-              [](const Version& left, const Version& right) { return 
left.first < right.first; });
     for (const auto& v : local_versions) {
         VLOG_CRITICAL << _tablet->tablet_id() << " tablet local version: " << 
v.first << " - "
                       << v.second;
@@ -274,13 +244,24 @@ bool SingleReplicaCompaction::_find_rowset_to_fetch(const 
std::vector<Version>&
     }
     if (find) {
         //  4. reset input rowsets
-        auto rs_iter = _input_rowsets.begin();
-        while (rs_iter != _input_rowsets.end()) {
-            if ((*proper_version).contains((*rs_iter)->version())) {
-                ++rs_iter;
-                continue;
+        _input_rowsets.clear();
+        _tablet->traverse_rowsets([this, &proper_version](const auto& rs) {
+            // only need rowset in proper_version
+            if (rs->is_local() && proper_version->contains(rs->version())) {
+                this->_input_rowsets.emplace_back(rs);
             }
-            rs_iter = _input_rowsets.erase(rs_iter);
+        });
+        std::sort(_input_rowsets.begin(), _input_rowsets.end(), 
Rowset::comparator);
+        DCHECK_EQ(_input_rowsets.front()->start_version(), 
proper_version->first);
+        DCHECK_EQ(_input_rowsets.back()->end_version(), 
proper_version->second);
+        if (_input_rowsets.front()->start_version() != proper_version->first ||
+            _input_rowsets.back()->end_version() != proper_version->second) {
+            LOG(WARNING) << fmt::format(
+                    "single compaction input rowsets error, tablet_id={}, 
input rowset = [{}-{}], "
+                    "remote rowset = {}",
+                    _tablet->tablet_id(), 
_input_rowsets.front()->start_version(),
+                    _input_rowsets.back()->end_version(), 
proper_version->to_string());
+            return false;
         }
         for (auto& rowset : _input_rowsets) {
             _input_rowsets_size += rowset->data_disk_size();
diff --git a/be/src/olap/single_replica_compaction.h 
b/be/src/olap/single_replica_compaction.h
index 3d967dff20b..ae013b3748d 100644
--- a/be/src/olap/single_replica_compaction.h
+++ b/be/src/olap/single_replica_compaction.h
@@ -47,7 +47,7 @@ protected:
 private:
     Status _do_single_replica_compaction();
     Status _do_single_replica_compaction_impl();
-    bool _find_rowset_to_fetch(const std::vector<Version>& peer_versions, 
Version* peer_version);
+    bool _find_rowset_to_fetch(const std::vector<Version>& peer_versions, 
Version* proper_version);
     Status _get_rowset_verisons_from_peer(const TReplicaInfo& addr,
                                           std::vector<Version>* peer_versions);
     Status _fetch_rowset(const TReplicaInfo& addr, const std::string& token,
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 9b745a92479..94aee96a260 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1334,20 +1334,6 @@ std::vector<RowsetSharedPtr> 
Tablet::pick_candidate_rowsets_to_cumulative_compac
     return candidate_rowsets;
 }
 
-std::vector<RowsetSharedPtr> 
Tablet::pick_candidate_rowsets_to_single_replica_compaction() {
-    std::vector<RowsetSharedPtr> candidate_rowsets;
-    {
-        std::shared_lock rlock(_meta_lock);
-        for (const auto& [version, rs] : _rs_version_map) {
-            if (rs->is_local()) {
-                candidate_rowsets.push_back(rs);
-            }
-        }
-    }
-    std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), 
Rowset::comparator);
-    return candidate_rowsets;
-}
-
 std::vector<RowsetSharedPtr> 
Tablet::pick_candidate_rowsets_to_base_compaction() {
     std::vector<RowsetSharedPtr> candidate_rowsets;
     {
@@ -1364,7 +1350,15 @@ std::vector<RowsetSharedPtr> 
Tablet::pick_candidate_rowsets_to_base_compaction()
 }
 
 std::vector<RowsetSharedPtr> 
Tablet::pick_candidate_rowsets_to_full_compaction() {
-    return pick_candidate_rowsets_to_single_replica_compaction();
+    std::vector<RowsetSharedPtr> candidate_rowsets;
+    traverse_rowsets([&candidate_rowsets](const auto& rs) {
+        // Do full compaction on all local rowsets.
+        if (rs->is_local()) {
+            candidate_rowsets.emplace_back(rs);
+        }
+    });
+    std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), 
Rowset::comparator);
+    return candidate_rowsets;
 }
 
 std::vector<RowsetSharedPtr> Tablet::pick_first_consecutive_empty_rowsets(int 
limit) {
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index d94990e4071..6201924e944 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -273,7 +273,6 @@ public:
     std::vector<RowsetSharedPtr> 
pick_candidate_rowsets_to_build_inverted_index(
             const std::set<int32_t>& alter_index_uids, bool is_drop_op);
 
-    std::vector<RowsetSharedPtr> 
pick_candidate_rowsets_to_single_replica_compaction();
     std::vector<Version> get_all_versions();
 
     std::vector<RowsetSharedPtr> pick_first_consecutive_empty_rowsets(int 
limit);
diff --git a/be/test/olap/single_compaction_test.cpp 
b/be/test/olap/single_compaction_test.cpp
new file mode 100644
index 00000000000..b190bf9e2a9
--- /dev/null
+++ b/be/test/olap/single_compaction_test.cpp
@@ -0,0 +1,142 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+
+#include <memory>
+#include <ostream>
+#include <string>
+#include <vector>
+
+#include "common/status.h"
+#include "gtest/gtest_pred_impl.h"
+#include "olap/data_dir.h"
+#include "olap/olap_common.h"
+#include "olap/rowset/beta_rowset.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/single_replica_compaction.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
+namespace doris {
+
+class SingleCompactionTest : public ::testing::Test {
+protected:
+    void SetUp() override {
+        const std::string dir_path = "ut_dir/single_compact_test";
+        _engine = new StorageEngine({});
+        _data_dir = new DataDir(dir_path);
+    }
+
+    TabletSharedPtr create_tablet(int64_t tablet_id) {
+        auto tablet_meta = std::make_shared<TabletMeta>();
+        tablet_meta->_tablet_id = tablet_id;
+        (void)tablet_meta->set_partition_id(10000);
+        tablet_meta->set_tablet_uid({tablet_id, 0});
+        tablet_meta->set_shard_id(tablet_id % 4);
+        tablet_meta->_schema_hash = tablet_id;
+        return std::make_shared<Tablet>(std::move(tablet_meta), _data_dir);
+    }
+    auto create_rowset(TabletSharedPtr tablet, int64_t start, int64 end) {
+        auto rowset_meta = std::make_shared<RowsetMeta>();
+        Version version(start, end);
+        rowset_meta->set_version(version);
+        rowset_meta->set_tablet_id(tablet->tablet_id());
+        rowset_meta->set_tablet_uid(tablet->tablet_uid());
+        rowset_meta->set_rowset_id(_engine->next_rowset_id());
+        return std::make_shared<BetaRowset>(tablet->tablet_schema(), 
tablet->tablet_path(),
+                                            std::move(rowset_meta));
+    }
+    void TearDown() override {
+        delete _engine;
+        delete _data_dir;
+    }
+
+private:
+    StorageEngine* _engine;
+    DataDir* _data_dir;
+};
+
+TEST_F(SingleCompactionTest, test_single) {
+    TabletSharedPtr tablet = create_tablet(10001);
+
+    SingleReplicaCompaction single_compaction(tablet, 
CompactionType::CUMULATIVE_COMPACTION);
+    auto st = tablet->init();
+    ASSERT_TRUE(st.ok()) << st;
+    // load 30 rowsets
+    for (int i = 1; i <= 30; ++i) {
+        auto rs = create_rowset(tablet, i, i);
+        st = tablet->add_inc_rowset(rs);
+        ASSERT_TRUE(st.ok()) << st;
+    }
+
+    // pick input rowsets, but picking is not needed now
+    st = single_compaction.prepare_compact();
+    ASSERT_TRUE(st.ok()) << st;
+
+    // load 2 rowsets
+    for (int i = 31; i <= 32; i++) {
+        auto rs = create_rowset(tablet, i, i);
+        st = tablet->add_inc_rowset(rs);
+        ASSERT_TRUE(st.ok()) << st;
+    }
+
+    // create peer compacted rowset
+    auto v1 = Version(1, 32);
+    auto v2 = Version(33, 38);
+    std::vector<Version> peer_version {v1, v2};
+    Version proper_version;
+    bool find = single_compaction._find_rowset_to_fetch(peer_version, 
&proper_version);
+    EXPECT_EQ(find, true);
+    EXPECT_EQ(single_compaction._input_rowsets.size(), 32);
+    EXPECT_EQ(single_compaction._input_rowsets.front()->start_version(),
+              single_compaction._output_version.first);
+    EXPECT_EQ(single_compaction._input_rowsets.back()->end_version(),
+              single_compaction._output_version.second);
+}
+
+TEST_F(SingleCompactionTest, test_unmatch) {
+    TabletSharedPtr tablet = create_tablet(10000);
+
+    SingleReplicaCompaction single_compaction(tablet, 
CompactionType::CUMULATIVE_COMPACTION);
+    auto st = tablet->init();
+    ASSERT_TRUE(st.ok()) << st;
+    // local rowset [4-6]
+    auto rs = create_rowset(tablet, 4, 6);
+    st = tablet->add_inc_rowset(rs);
+    ASSERT_TRUE(st.ok()) << st;
+
+    // pick input rowsets, but picking is not needed now
+    st = single_compaction.prepare_compact();
+    ASSERT_TRUE(st.ok()) << st;
+
+    // create peer compacted rowset [3-5], [6-9]
+    auto v1 = Version(3, 5);
+    auto v2 = Version(6, 9);
+    std::vector<Version> peer_version {v1, v2};
+    Version proper_version;
+    bool find = single_compaction._find_rowset_to_fetch(peer_version, 
&proper_version);
+    EXPECT_EQ(find, false); // no matched version, find = false
+    EXPECT_EQ(single_compaction._input_rowsets.size(), 0);
+    EXPECT_EQ(single_compaction._output_version.first, 0);
+    EXPECT_EQ(single_compaction._output_version.second, 0);
+}
+
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to