This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ec5471f048 [feature-wip](unique-key-merge-on-write) Implement tablet
lookup interface, using rowset-tree, DSIP-018[3/5] (#10938)
ec5471f048 is described below
commit ec5471f048f85fd8237c128acc9cc06cce9fe82c
Author: zhannngchen <[email protected]>
AuthorDate: Wed Jul 20 14:52:14 2022 +0800
[feature-wip](unique-key-merge-on-write) Implement tablet lookup interface,
using rowset-tree, DSIP-018[3/5] (#10938)
---
be/src/olap/tablet.cpp | 76 +++++++++++++++++++++++++++++++++++++++--
be/src/olap/tablet.h | 6 +++-
be/test/olap/tablet_test.cpp | 81 ++++++++++++++++++++++++++++++++++++++++++--
3 files changed, 156 insertions(+), 7 deletions(-)
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index ec2fa0050f..bf71ed1e28 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -52,6 +52,7 @@
#include "olap/storage_policy_mgr.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_meta_manager.h"
+#include "segment_loader.h"
#include "util/path_util.h"
#include "util/pretty_printer.h"
#include "util/scoped_cleanup.h"
@@ -111,6 +112,7 @@ Status Tablet::_init_once_action() {
_cumulative_compaction_type);
#endif
+ RowsetVector rowset_vec;
for (const auto& rs_meta : _tablet_meta->all_rs_metas()) {
Version version = rs_meta->version();
RowsetSharedPtr rowset;
@@ -121,6 +123,7 @@ Status Tablet::_init_once_action() {
<< ", res=" << res;
return res;
}
+ rowset_vec.push_back(rowset);
_rs_version_map[version] = std::move(rowset);
}
@@ -138,6 +141,10 @@ Status Tablet::_init_once_action() {
_stale_rs_version_map[version] = std::move(rowset);
}
+ if (_schema.keys_type() == UNIQUE_KEYS &&
enable_unique_key_merge_on_write()) {
+ _rowset_tree = std::make_unique<RowsetTree>();
+ res = _rowset_tree->Init(rowset_vec);
+ }
return res;
}
@@ -235,6 +242,13 @@ Status Tablet::add_rowset(RowsetSharedPtr rowset) {
_rs_version_map[rowset->version()] = rowset;
_timestamped_version_tracker.add_version(rowset->version());
+ // Update rowset tree
+ if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) {
+ auto new_rowset_tree = std::make_unique<RowsetTree>();
+ ModifyRowSetTree(*_rowset_tree, {}, {rowset}, new_rowset_tree.get());
+ _rowset_tree = std::move(new_rowset_tree);
+ }
+
std::vector<RowsetSharedPtr> rowsets_to_delete;
// yiguolei: temp code, should remove the rowset contains by this rowset
// but it should be removed in multi path version
@@ -267,6 +281,10 @@ Status
Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
// In this case, we no longer need to add the rowset in "to_delete" to
// _stale_rs_version_map, but can delete it directly.
+ if (to_add.empty() && to_delete.empty()) {
+ return Status::OK();
+ }
+
bool same_version = true;
std::sort(to_add.begin(), to_add.end(), Rowset::comparator);
std::sort(to_delete.begin(), to_delete.end(), Rowset::comparator);
@@ -323,6 +341,13 @@ Status
Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
_tablet_meta->modify_rs_metas(rs_metas_to_add, rs_metas_to_delete,
same_version);
+ // Update rowset tree
+ if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) {
+ auto new_rowset_tree = std::make_unique<RowsetTree>();
+ ModifyRowSetTree(*_rowset_tree, to_delete, to_add,
new_rowset_tree.get());
+ _rowset_tree = std::move(new_rowset_tree);
+ }
+
if (!same_version) {
// add rs_metas_to_delete to tracker
_timestamped_version_tracker.add_stale_path_version(rs_metas_to_delete);
@@ -410,6 +435,13 @@ Status Tablet::add_inc_rowset(const RowsetSharedPtr&
rowset) {
RETURN_NOT_OK(_tablet_meta->add_rs_meta(rowset->rowset_meta()));
_rs_version_map[rowset->version()] = rowset;
+ // Update rowset tree
+ if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) {
+ auto new_rowset_tree = std::make_unique<RowsetTree>();
+ ModifyRowSetTree(*_rowset_tree, {}, {rowset}, new_rowset_tree.get());
+ _rowset_tree = std::move(new_rowset_tree);
+ }
+
_timestamped_version_tracker.add_version(rowset->version());
++_newly_created_rowset_num;
@@ -1007,6 +1039,11 @@ void Tablet::delete_all_files() {
it.second->remove();
}
_stale_rs_version_map.clear();
+
+ if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) {
+ // clear rowset_tree
+ _rowset_tree = std::make_unique<RowsetTree>();
+ }
}
bool Tablet::check_path(const std::string& path_to_check) const {
@@ -1817,9 +1854,42 @@ const TabletSchema& Tablet::tablet_schema() const {
return *rowset_meta->tablet_schema();
}
-Status Tablet::lookup_row_key(const Slice& encoded_key, RowLocation*
row_location) {
- // TODO(zhannngchen): to be implemented in next patch, align with
rowset-tree usage and
- // update.
+Status Tablet::lookup_row_key(const Slice& encoded_key, RowLocation*
row_location,
+ uint32_t version) {
+ std::vector<std::pair<RowsetSharedPtr, int32_t>> selected_rs;
+ _rowset_tree->FindRowsetsWithKeyInRange(encoded_key, &selected_rs);
+ if (selected_rs.empty()) {
+ return Status::NotFound("No rowsets contains the key in key range");
+ }
+ // Usually newly written data has a higher probability of being modified,
so prefer
+ // to search the key in the rowset with larger version.
+ std::sort(selected_rs.begin(), selected_rs.end(),
+ [](std::pair<RowsetSharedPtr, int32_t>& a,
std::pair<RowsetSharedPtr, int32_t>& b) {
+ return a.first->end_version() > b.first->end_version();
+ });
+ RowLocation loc;
+ for (auto& rs : selected_rs) {
+ if (rs.first->end_version() > version) {
+ continue;
+ }
+ SegmentCacheHandle segment_cache_handle;
+ RETURN_NOT_OK(SegmentLoader::instance()->load_segments(
+ std::static_pointer_cast<BetaRowset>(rs.first),
&segment_cache_handle, true));
+ auto& segments = segment_cache_handle.get_segments();
+ DCHECK_GT(segments.size(), rs.second);
+ Status s = segments[rs.second]->lookup_row_key(encoded_key, &loc);
+ if (s.is_not_found()) {
+ continue;
+ }
+ if (!s.ok()) {
+ return s;
+ }
+ loc.rowset_id = rs.first->rowset_id();
+ // Check delete bitmap, if the row
+ *row_location = loc;
+ // find it and return
+ return s;
+ }
return Status::NotFound("can't find key in all rowsets");
}
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 23aaeaed55..622495a12c 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -33,6 +33,7 @@
#include "olap/olap_define.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_reader.h"
+#include "olap/rowset/rowset_tree.h"
#include "olap/tablet_meta.h"
#include "olap/tuple.h"
#include "olap/utils.h"
@@ -307,7 +308,7 @@ public:
// Lookup the row location of `encoded_key`, the function sets
`row_location` on success.
// NOTE: the method only works in unique key model with primary key index,
you will got a
// not supported error in other data model.
- Status lookup_row_key(const Slice& encoded_key, RowLocation* row_location);
+ Status lookup_row_key(const Slice& encoded_key, RowLocation* row_location,
uint32_t version);
private:
Status _init_once_action();
@@ -362,6 +363,9 @@ private:
// These _stale rowsets are been removed when rowsets' pathVersion is
expired,
// this policy is judged and computed by TimestampedVersionTracker.
std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>
_stale_rs_version_map;
+ // RowsetTree is used to locate rowsets containing a key or a key range
quickly.
+ // It's only used in UNIQUE_KEYS data model.
+ std::unique_ptr<RowsetTree> _rowset_tree;
// if this tablet is broken, set to true. default is false
std::atomic<bool> _is_bad;
// timestamp of last cumu compaction failure
diff --git a/be/test/olap/tablet_test.cpp b/be/test/olap/tablet_test.cpp
index d819fc3a0a..e5240b27d3 100644
--- a/be/test/olap/tablet_test.cpp
+++ b/be/test/olap/tablet_test.cpp
@@ -25,6 +25,7 @@
#include "olap/rowset/beta_rowset.h"
#include "olap/storage_policy_mgr.h"
#include "olap/tablet_meta.h"
+#include "testutil/mock_rowset.h"
#include "util/time.h"
using namespace std;
@@ -38,9 +39,7 @@ public:
virtual ~TestTablet() {}
virtual void SetUp() {
- _tablet_meta = static_cast<TabletMetaSharedPtr>(new TabletMeta(
- 1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}},
UniqueId(9, 10),
- TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F));
+ _tablet_meta = new_tablet_meta(TTabletSchema());
_json_rowset_meta = R"({
"rowset_id": 540081,
"tablet_id": 15673,
@@ -93,6 +92,13 @@ public:
virtual void TearDown() {}
+ TabletMetaSharedPtr new_tablet_meta(TTabletSchema schema, bool
enable_merge_on_write = false) {
+ return static_cast<TabletMetaSharedPtr>(
+ new TabletMeta(1, 2, 15673, 15674, 4, 5, schema, 6, {{7, 8}},
UniqueId(9, 10),
+ TTabletType::TABLET_TYPE_DISK,
TCompressionType::LZ4F, std::string(),
+ enable_merge_on_write));
+ }
+
void init_rs_meta(RowsetMetaSharedPtr& pb1, int64_t start, int64_t end) {
pb1->init_from_json(_json_rowset_meta);
pb1->set_start_version(start);
@@ -111,6 +117,16 @@ public:
pb1->set_num_segments(2);
}
+ void init_rs_meta(RowsetMetaSharedPtr& pb1, int64_t start, int64_t end,
+ std::vector<KeyBoundsPB> keybounds) {
+ pb1->init_from_json(_json_rowset_meta);
+ pb1->set_start_version(start);
+ pb1->set_end_version(end);
+ pb1->set_creation_time(10000);
+ pb1->set_segments_key_bounds(keybounds);
+ pb1->set_num_segments(keybounds.size());
+ }
+
void init_all_rs_meta(std::vector<RowsetMetaSharedPtr>* rs_metas) {
RowsetMetaSharedPtr ptr1(new RowsetMeta());
init_rs_meta(ptr1, 0, 0);
@@ -176,6 +192,18 @@ public:
rs_metas->push_back(v5);
}
+ std::vector<KeyBoundsPB> convert_key_bounds(
+ std::vector<std::pair<std::string, std::string>> key_pairs) {
+ std::vector<KeyBoundsPB> res;
+ for (auto pair : key_pairs) {
+ KeyBoundsPB key_bounds;
+ key_bounds.set_min_key(pair.first);
+ key_bounds.set_max_key(pair.second);
+ res.push_back(key_bounds);
+ }
+ return res;
+ }
+
protected:
std::string _json_rowset_meta;
TabletMetaSharedPtr _tablet_meta;
@@ -329,4 +357,51 @@ TEST_F(TestTablet, cooldown_policy) {
}
}
+TEST_F(TestTablet, rowset_tree_update) {
+ TTabletSchema tschema;
+ tschema.keys_type = TKeysType::UNIQUE_KEYS;
+ TabletMetaSharedPtr tablet_meta = new_tablet_meta(tschema, true);
+ TabletSharedPtr tablet(new Tablet(tablet_meta, nullptr));
+ tablet->init();
+
+ RowsetMetaSharedPtr rsm1(new RowsetMeta());
+ init_rs_meta(rsm1, 6, 7, convert_key_bounds({{"100", "200"}, {"300",
"400"}}));
+ RowsetId id1;
+ id1.init(10010);
+ RowsetSharedPtr rs_ptr1;
+ MockRowset::create_rowset(&tablet_meta->tablet_schema(), "", rsm1,
&rs_ptr1, false);
+ tablet->add_inc_rowset(rs_ptr1);
+
+ RowsetMetaSharedPtr rsm2(new RowsetMeta());
+ init_rs_meta(rsm2, 8, 8, convert_key_bounds({{"500", "999"}}));
+ RowsetId id2;
+ id2.init(10086);
+ rsm2->set_rowset_id(id2);
+ RowsetSharedPtr rs_ptr2;
+ MockRowset::create_rowset(&tablet_meta->tablet_schema(), "", rsm2,
&rs_ptr2, false);
+ tablet->add_inc_rowset(rs_ptr2);
+
+ RowLocation loc;
+ // Key not in range.
+ ASSERT_TRUE(tablet->lookup_row_key("99", &loc, 7).is_not_found());
+ // Version too low.
+ ASSERT_TRUE(tablet->lookup_row_key("101", &loc, 3).is_not_found());
+ // Hit a segment, but since we don't have real data, return an internal
error when loading the
+ // segment.
+ ASSERT_TRUE(tablet->lookup_row_key("101", &loc, 7).precise_code() ==
+ OLAP_ERR_ROWSET_LOAD_FAILED);
+ // Key not in range.
+ ASSERT_TRUE(tablet->lookup_row_key("201", &loc, 7).is_not_found());
+ ASSERT_TRUE(tablet->lookup_row_key("300", &loc, 7).precise_code() ==
+ OLAP_ERR_ROWSET_LOAD_FAILED);
+ // Key not in range.
+ ASSERT_TRUE(tablet->lookup_row_key("499", &loc, 7).is_not_found());
+ // Version too low.
+ ASSERT_TRUE(tablet->lookup_row_key("500", &loc, 7).is_not_found());
+ // Hit a segment, but since we don't have real data, return an internal
error when loading the
+ // segment.
+ ASSERT_TRUE(tablet->lookup_row_key("500", &loc, 8).precise_code() ==
+ OLAP_ERR_ROWSET_LOAD_FAILED);
+}
+
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]