This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ec5471f048 [feature-wip](unique-key-merge-on-write) Implement tablet 
lookup interface, using rowset-tree, DSIP-018[3/5] (#10938)
ec5471f048 is described below

commit ec5471f048f85fd8237c128acc9cc06cce9fe82c
Author: zhannngchen <[email protected]>
AuthorDate: Wed Jul 20 14:52:14 2022 +0800

    [feature-wip](unique-key-merge-on-write) Implement tablet lookup interface, 
using rowset-tree, DSIP-018[3/5] (#10938)
---
 be/src/olap/tablet.cpp       | 76 +++++++++++++++++++++++++++++++++++++++--
 be/src/olap/tablet.h         |  6 +++-
 be/test/olap/tablet_test.cpp | 81 ++++++++++++++++++++++++++++++++++++++++++--
 3 files changed, 156 insertions(+), 7 deletions(-)

diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index ec2fa0050f..bf71ed1e28 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -52,6 +52,7 @@
 #include "olap/storage_policy_mgr.h"
 #include "olap/tablet_meta.h"
 #include "olap/tablet_meta_manager.h"
+#include "segment_loader.h"
 #include "util/path_util.h"
 #include "util/pretty_printer.h"
 #include "util/scoped_cleanup.h"
@@ -111,6 +112,7 @@ Status Tablet::_init_once_action() {
                     _cumulative_compaction_type);
 #endif
 
+    RowsetVector rowset_vec;
     for (const auto& rs_meta : _tablet_meta->all_rs_metas()) {
         Version version = rs_meta->version();
         RowsetSharedPtr rowset;
@@ -121,6 +123,7 @@ Status Tablet::_init_once_action() {
                          << ", res=" << res;
             return res;
         }
+        rowset_vec.push_back(rowset);
         _rs_version_map[version] = std::move(rowset);
     }
 
@@ -138,6 +141,10 @@ Status Tablet::_init_once_action() {
         _stale_rs_version_map[version] = std::move(rowset);
     }
 
+    if (_schema.keys_type() == UNIQUE_KEYS && 
enable_unique_key_merge_on_write()) {
+        _rowset_tree = std::make_unique<RowsetTree>();
+        res = _rowset_tree->Init(rowset_vec);
+    }
     return res;
 }
 
@@ -235,6 +242,13 @@ Status Tablet::add_rowset(RowsetSharedPtr rowset) {
     _rs_version_map[rowset->version()] = rowset;
     _timestamped_version_tracker.add_version(rowset->version());
 
+    // Update rowset tree
+    if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) {
+        auto new_rowset_tree = std::make_unique<RowsetTree>();
+        ModifyRowSetTree(*_rowset_tree, {}, {rowset}, new_rowset_tree.get());
+        _rowset_tree = std::move(new_rowset_tree);
+    }
+
     std::vector<RowsetSharedPtr> rowsets_to_delete;
     // yiguolei: temp code, should remove the rowset contains by this rowset
     // but it should be removed in multi path version
@@ -267,6 +281,10 @@ Status 
Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
     // In this case, we no longer need to add the rowset in "to_delete" to
     // _stale_rs_version_map, but can delete it directly.
 
+    if (to_add.empty() && to_delete.empty()) {
+        return Status::OK();
+    }
+
     bool same_version = true;
     std::sort(to_add.begin(), to_add.end(), Rowset::comparator);
     std::sort(to_delete.begin(), to_delete.end(), Rowset::comparator);
@@ -323,6 +341,13 @@ Status 
Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
 
     _tablet_meta->modify_rs_metas(rs_metas_to_add, rs_metas_to_delete, 
same_version);
 
+    // Update rowset tree
+    if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) {
+        auto new_rowset_tree = std::make_unique<RowsetTree>();
+        ModifyRowSetTree(*_rowset_tree, to_delete, to_add, 
new_rowset_tree.get());
+        _rowset_tree = std::move(new_rowset_tree);
+    }
+
     if (!same_version) {
         // add rs_metas_to_delete to tracker
         
_timestamped_version_tracker.add_stale_path_version(rs_metas_to_delete);
@@ -410,6 +435,13 @@ Status Tablet::add_inc_rowset(const RowsetSharedPtr& 
rowset) {
     RETURN_NOT_OK(_tablet_meta->add_rs_meta(rowset->rowset_meta()));
     _rs_version_map[rowset->version()] = rowset;
 
+    // Update rowset tree
+    if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) {
+        auto new_rowset_tree = std::make_unique<RowsetTree>();
+        ModifyRowSetTree(*_rowset_tree, {}, {rowset}, new_rowset_tree.get());
+        _rowset_tree = std::move(new_rowset_tree);
+    }
+
     _timestamped_version_tracker.add_version(rowset->version());
 
     ++_newly_created_rowset_num;
@@ -1007,6 +1039,11 @@ void Tablet::delete_all_files() {
         it.second->remove();
     }
     _stale_rs_version_map.clear();
+
+    if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) {
+        // clear rowset_tree
+        _rowset_tree = std::make_unique<RowsetTree>();
+    }
 }
 
 bool Tablet::check_path(const std::string& path_to_check) const {
@@ -1817,9 +1854,42 @@ const TabletSchema& Tablet::tablet_schema() const {
     return *rowset_meta->tablet_schema();
 }
 
-Status Tablet::lookup_row_key(const Slice& encoded_key, RowLocation* 
row_location) {
-    // TODO(zhannngchen): to be implemented in next patch, align with 
rowset-tree usage and
-    // update.
+Status Tablet::lookup_row_key(const Slice& encoded_key, RowLocation* 
row_location,
+                              uint32_t version) {
+    std::vector<std::pair<RowsetSharedPtr, int32_t>> selected_rs;
+    _rowset_tree->FindRowsetsWithKeyInRange(encoded_key, &selected_rs);
+    if (selected_rs.empty()) {
+        return Status::NotFound("No rowsets contains the key in key range");
+    }
+    // Usually newly written data has a higher probability of being modified, 
so prefer
+    // to search the key in the rowset with larger version.
+    std::sort(selected_rs.begin(), selected_rs.end(),
+              [](std::pair<RowsetSharedPtr, int32_t>& a, 
std::pair<RowsetSharedPtr, int32_t>& b) {
+                  return a.first->end_version() > b.first->end_version();
+              });
+    RowLocation loc;
+    for (auto& rs : selected_rs) {
+        if (rs.first->end_version() > version) {
+            continue;
+        }
+        SegmentCacheHandle segment_cache_handle;
+        RETURN_NOT_OK(SegmentLoader::instance()->load_segments(
+                std::static_pointer_cast<BetaRowset>(rs.first), 
&segment_cache_handle, true));
+        auto& segments = segment_cache_handle.get_segments();
+        DCHECK_GT(segments.size(), rs.second);
+        Status s = segments[rs.second]->lookup_row_key(encoded_key, &loc);
+        if (s.is_not_found()) {
+            continue;
+        }
+        if (!s.ok()) {
+            return s;
+        }
+        loc.rowset_id = rs.first->rowset_id();
+        // Check delete bitmap, if the row
+        *row_location = loc;
+        // find it and return
+        return s;
+    }
     return Status::NotFound("can't find key in all rowsets");
 }
 
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 23aaeaed55..622495a12c 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -33,6 +33,7 @@
 #include "olap/olap_define.h"
 #include "olap/rowset/rowset.h"
 #include "olap/rowset/rowset_reader.h"
+#include "olap/rowset/rowset_tree.h"
 #include "olap/tablet_meta.h"
 #include "olap/tuple.h"
 #include "olap/utils.h"
@@ -307,7 +308,7 @@ public:
     // Lookup the row location of `encoded_key`, the function sets 
`row_location` on success.
     // NOTE: the method only works in unique key model with primary key index, 
you will got a
     //       not supported error in other data model.
-    Status lookup_row_key(const Slice& encoded_key, RowLocation* row_location);
+    Status lookup_row_key(const Slice& encoded_key, RowLocation* row_location, 
uint32_t version);
 
 private:
     Status _init_once_action();
@@ -362,6 +363,9 @@ private:
     // These _stale rowsets are been removed when rowsets' pathVersion is 
expired,
     // this policy is judged and computed by TimestampedVersionTracker.
     std::unordered_map<Version, RowsetSharedPtr, HashOfVersion> 
_stale_rs_version_map;
+    // RowsetTree is used to locate rowsets containing a key or a key range 
quickly.
+    // It's only used in UNIQUE_KEYS data model.
+    std::unique_ptr<RowsetTree> _rowset_tree;
     // if this tablet is broken, set to true. default is false
     std::atomic<bool> _is_bad;
     // timestamp of last cumu compaction failure
diff --git a/be/test/olap/tablet_test.cpp b/be/test/olap/tablet_test.cpp
index d819fc3a0a..e5240b27d3 100644
--- a/be/test/olap/tablet_test.cpp
+++ b/be/test/olap/tablet_test.cpp
@@ -25,6 +25,7 @@
 #include "olap/rowset/beta_rowset.h"
 #include "olap/storage_policy_mgr.h"
 #include "olap/tablet_meta.h"
+#include "testutil/mock_rowset.h"
 #include "util/time.h"
 
 using namespace std;
@@ -38,9 +39,7 @@ public:
     virtual ~TestTablet() {}
 
     virtual void SetUp() {
-        _tablet_meta = static_cast<TabletMetaSharedPtr>(new TabletMeta(
-                1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, 
UniqueId(9, 10),
-                TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F));
+        _tablet_meta = new_tablet_meta(TTabletSchema());
         _json_rowset_meta = R"({
             "rowset_id": 540081,
             "tablet_id": 15673,
@@ -93,6 +92,13 @@ public:
 
     virtual void TearDown() {}
 
+    TabletMetaSharedPtr new_tablet_meta(TTabletSchema schema, bool 
enable_merge_on_write = false) {
+        return static_cast<TabletMetaSharedPtr>(
+                new TabletMeta(1, 2, 15673, 15674, 4, 5, schema, 6, {{7, 8}}, 
UniqueId(9, 10),
+                               TTabletType::TABLET_TYPE_DISK, 
TCompressionType::LZ4F, std::string(),
+                               enable_merge_on_write));
+    }
+
     void init_rs_meta(RowsetMetaSharedPtr& pb1, int64_t start, int64_t end) {
         pb1->init_from_json(_json_rowset_meta);
         pb1->set_start_version(start);
@@ -111,6 +117,16 @@ public:
         pb1->set_num_segments(2);
     }
 
+    void init_rs_meta(RowsetMetaSharedPtr& pb1, int64_t start, int64_t end,
+                      std::vector<KeyBoundsPB> keybounds) {
+        pb1->init_from_json(_json_rowset_meta);
+        pb1->set_start_version(start);
+        pb1->set_end_version(end);
+        pb1->set_creation_time(10000);
+        pb1->set_segments_key_bounds(keybounds);
+        pb1->set_num_segments(keybounds.size());
+    }
+
     void init_all_rs_meta(std::vector<RowsetMetaSharedPtr>* rs_metas) {
         RowsetMetaSharedPtr ptr1(new RowsetMeta());
         init_rs_meta(ptr1, 0, 0);
@@ -176,6 +192,18 @@ public:
         rs_metas->push_back(v5);
     }
 
+    std::vector<KeyBoundsPB> convert_key_bounds(
+            std::vector<std::pair<std::string, std::string>> key_pairs) {
+        std::vector<KeyBoundsPB> res;
+        for (auto pair : key_pairs) {
+            KeyBoundsPB key_bounds;
+            key_bounds.set_min_key(pair.first);
+            key_bounds.set_max_key(pair.second);
+            res.push_back(key_bounds);
+        }
+        return res;
+    }
+
 protected:
     std::string _json_rowset_meta;
     TabletMetaSharedPtr _tablet_meta;
@@ -329,4 +357,51 @@ TEST_F(TestTablet, cooldown_policy) {
     }
 }
 
+TEST_F(TestTablet, rowset_tree_update) {
+    TTabletSchema tschema;
+    tschema.keys_type = TKeysType::UNIQUE_KEYS;
+    TabletMetaSharedPtr tablet_meta = new_tablet_meta(tschema, true);
+    TabletSharedPtr tablet(new Tablet(tablet_meta, nullptr));
+    tablet->init();
+
+    RowsetMetaSharedPtr rsm1(new RowsetMeta());
+    init_rs_meta(rsm1, 6, 7, convert_key_bounds({{"100", "200"}, {"300", 
"400"}}));
+    RowsetId id1;
+    id1.init(10010);
+    RowsetSharedPtr rs_ptr1;
+    MockRowset::create_rowset(&tablet_meta->tablet_schema(), "", rsm1, 
&rs_ptr1, false);
+    tablet->add_inc_rowset(rs_ptr1);
+
+    RowsetMetaSharedPtr rsm2(new RowsetMeta());
+    init_rs_meta(rsm2, 8, 8, convert_key_bounds({{"500", "999"}}));
+    RowsetId id2;
+    id2.init(10086);
+    rsm2->set_rowset_id(id2);
+    RowsetSharedPtr rs_ptr2;
+    MockRowset::create_rowset(&tablet_meta->tablet_schema(), "", rsm2, 
&rs_ptr2, false);
+    tablet->add_inc_rowset(rs_ptr2);
+
+    RowLocation loc;
+    // Key not in range.
+    ASSERT_TRUE(tablet->lookup_row_key("99", &loc, 7).is_not_found());
+    // Version too low.
+    ASSERT_TRUE(tablet->lookup_row_key("101", &loc, 3).is_not_found());
+    // Hit a segment, but since we don't have real data, return an internal 
error when loading the
+    // segment.
+    ASSERT_TRUE(tablet->lookup_row_key("101", &loc, 7).precise_code() ==
+                OLAP_ERR_ROWSET_LOAD_FAILED);
+    // Key not in range.
+    ASSERT_TRUE(tablet->lookup_row_key("201", &loc, 7).is_not_found());
+    ASSERT_TRUE(tablet->lookup_row_key("300", &loc, 7).precise_code() ==
+                OLAP_ERR_ROWSET_LOAD_FAILED);
+    // Key not in range.
+    ASSERT_TRUE(tablet->lookup_row_key("499", &loc, 7).is_not_found());
+    // Version too low.
+    ASSERT_TRUE(tablet->lookup_row_key("500", &loc, 7).is_not_found());
+    // Hit a segment, but since we don't have real data, return an internal 
error when loading the
+    // segment.
+    ASSERT_TRUE(tablet->lookup_row_key("500", &loc, 8).precise_code() ==
+                OLAP_ERR_ROWSET_LOAD_FAILED);
+}
+
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to