This is an automated email from the ASF dual-hosted git repository.
zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a1758bd139 [feature-wip](unique-key-merge-on-write) Add agg cache for
delete bitmap DSIP-018 (#10921)
a1758bd139 is described below
commit a1758bd1394a4031a97d09770b0c938867404300
Author: Compilation Success <[email protected]>
AuthorDate: Thu Jul 21 12:48:44 2022 +0800
[feature-wip](unique-key-merge-on-write) Add agg cache for delete bitmap
DSIP-018 (#10921)
Use global LRU for delete bitmap cache
---
be/src/common/config.h | 3 ++
be/src/olap/tablet_meta.cpp | 79 +++++++++++++++++++++++++++++++++++++--
be/src/olap/tablet_meta.h | 55 ++++++++++++++++++++++++++-
be/test/olap/tablet_meta_test.cpp | 55 +++++++++++++++++++++++++--
4 files changed, 183 insertions(+), 9 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index d0fb58de5a..12e35d2574 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -708,6 +708,9 @@ CONF_mInt32(external_table_connect_timeout_sec, "5");
// So the value of this config should corresponding to the number of rowsets
on this BE.
CONF_mInt32(segment_cache_capacity, "1000000");
+// Global bitmap cache capacity for aggregation cache, size in bytes
+CONF_Int64(delete_bitmap_agg_cache_capacity, "104857600");
+
// s3 config
CONF_mInt32(max_remote_storage_count, "10");
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index ad4d4b104c..18dd73f8bc 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -51,7 +51,9 @@ Status TabletMeta::create(const TCreateTabletReq& request,
const TabletUid& tabl
}
TabletMeta::TabletMeta()
- : _tablet_uid(0, 0), _schema(new TabletSchema), _delete_bitmap(new
DeleteBitmap()) {}
+ : _tablet_uid(0, 0),
+ _schema(new TabletSchema),
+ _delete_bitmap(new DeleteBitmap(_tablet_id)) {}
TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t
tablet_id,
int64_t replica_id, int32_t schema_hash, uint64_t
shard_id,
@@ -60,7 +62,9 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t
partition_id, int64_t tablet_id
TabletUid tablet_uid, TTabletType::type tabletType,
TCompressionType::type compression_type, const
std::string& storage_policy,
bool enable_unique_key_merge_on_write)
- : _tablet_uid(0, 0), _schema(new TabletSchema), _delete_bitmap(new
DeleteBitmap()) {
+ : _tablet_uid(0, 0),
+ _schema(new TabletSchema),
+ _delete_bitmap(new DeleteBitmap(tablet_id)) {
TabletMetaPB tablet_meta_pb;
tablet_meta_pb.set_table_id(table_id);
tablet_meta_pb.set_partition_id(partition_id);
@@ -203,7 +207,7 @@ TabletMeta::TabletMeta(const TabletMeta& b)
_in_restore_mode(b._in_restore_mode),
_preferred_rowset_type(b._preferred_rowset_type),
_cooldown_resource(b._cooldown_resource),
- _delete_bitmap(new DeleteBitmap(*b._delete_bitmap)) {};
+ _delete_bitmap(b._delete_bitmap) {};
void TabletMeta::init_column_from_tcolumn(uint32_t unique_id, const TColumn&
tcolumn,
ColumnPB* column) {
@@ -757,23 +761,29 @@ bool operator!=(const TabletMeta& a, const TabletMeta& b)
{
return !(a == b);
}
-DeleteBitmap::DeleteBitmap() {}
+DeleteBitmap::DeleteBitmap(int64_t tablet_id) : _tablet_id(tablet_id) {
+ _agg_cache.reset(new AggCache(config::delete_bitmap_agg_cache_capacity));
+}
DeleteBitmap::DeleteBitmap(const DeleteBitmap& o) {
delete_bitmap = o.delete_bitmap; // just copy data
+ _tablet_id = o._tablet_id;
}
DeleteBitmap& DeleteBitmap::operator=(const DeleteBitmap& o) {
delete_bitmap = o.delete_bitmap; // just copy data
+ _tablet_id = o._tablet_id;
return *this;
}
DeleteBitmap::DeleteBitmap(DeleteBitmap&& o) {
delete_bitmap = std::move(o.delete_bitmap);
+ _tablet_id = o._tablet_id;
}
DeleteBitmap& DeleteBitmap::operator=(DeleteBitmap&& o) {
delete_bitmap = std::move(o.delete_bitmap);
+ _tablet_id = o._tablet_id;
return *this;
}
@@ -812,6 +822,10 @@ bool DeleteBitmap::contains(const BitmapKey& bmk, uint32_t
row_id) const {
return it != delete_bitmap.end() && it->second.contains(row_id);
}
+bool DeleteBitmap::contains_agg(const BitmapKey& bmk, uint32_t row_id) const {
+ return get_agg(bmk)->contains(row_id);
+}
+
int DeleteBitmap::set(const BitmapKey& bmk, const roaring::Roaring&
segment_delete_bitmap) {
std::lock_guard l(lock);
auto [_, inserted] = delete_bitmap.insert_or_assign(bmk,
segment_delete_bitmap);
@@ -855,4 +869,61 @@ void DeleteBitmap::merge(const DeleteBitmap& other) {
}
}
+// We cannot just copy the underlying memory to construct a string
+// due to equivalent objects may have different padding bytes.
+// Reading padding bytes is undefined behavior, neither copy nor
+// placement new will help simplify the code.
+// Refer to C11 standards §6.2.6.1/6 and §6.7.9/21 for more info.
+static std::string agg_cache_key(int64_t tablet_id, const
DeleteBitmap::BitmapKey& bmk) {
+ std::string ret(sizeof(tablet_id) + sizeof(bmk), '\0');
+ *reinterpret_cast<int64_t*>(ret.data()) = tablet_id;
+ auto t = reinterpret_cast<DeleteBitmap::BitmapKey*>(ret.data() +
sizeof(tablet_id));
+ std::get<RowsetId>(*t).version = std::get<RowsetId>(bmk).version;
+ std::get<RowsetId>(*t).hi = std::get<RowsetId>(bmk).hi;
+ std::get<RowsetId>(*t).mi = std::get<RowsetId>(bmk).mi;
+ std::get<RowsetId>(*t).lo = std::get<RowsetId>(bmk).lo;
+ std::get<1>(*t) = std::get<1>(bmk);
+ std::get<2>(*t) = std::get<2>(bmk);
+ return ret;
+}
+
+std::shared_ptr<roaring::Roaring> DeleteBitmap::get_agg(const BitmapKey& bmk)
const {
+ std::string key_str = agg_cache_key(_tablet_id, bmk); // Cache key
container
+ CacheKey key(key_str);
+ Cache::Handle* handle = _agg_cache->repr()->lookup(key);
+
+ AggCache::Value* val =
+ handle == nullptr
+ ? nullptr
+ :
reinterpret_cast<AggCache::Value*>(_agg_cache->repr()->value(handle));
+ // FIXME: do we need a mutex here to get rid of duplicated initializations
+ // of cache entries in some cases?
+ if (val == nullptr) { // Renew if needed, put a new Value to cache
+ val = new AggCache::Value();
+ {
+ std::shared_lock l(lock);
+ DeleteBitmap::BitmapKey start {std::get<0>(bmk), std::get<1>(bmk),
0};
+ for (auto it = delete_bitmap.lower_bound(start); it !=
delete_bitmap.end(); ++it) {
+ auto& [k, bm] = *it;
+ if (std::get<0>(k) != std::get<0>(bmk) || std::get<1>(k) !=
std::get<1>(bmk) ||
+ std::get<2>(k) > std::get<2>(bmk)) {
+ break;
+ }
+ val->bitmap |= bm;
+ }
+ }
+ static auto deleter = [](const CacheKey& key, void* value) {
+ delete (AggCache::Value*)value; // Just delete to reclaim
+ };
+ size_t charge = val->bitmap.getSizeInBytes() + sizeof(AggCache::Value);
+ handle = _agg_cache->repr()->insert(key, val, charge, deleter,
CachePriority::NORMAL);
+ }
+
+ // It is natural for the cache to reclaim the underlying memory
+ return std::shared_ptr<roaring::Roaring>(
+ &val->bitmap, [this, handle](...) {
_agg_cache->repr()->release(handle); });
+}
+
+std::atomic<ShardedLRUCache*> DeleteBitmap::AggCache::s_repr {nullptr};
+
} // namespace doris
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 60713fa977..7275d0430d 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -17,6 +17,7 @@
#pragma once
+#include <atomic>
#include <cstddef>
#include <mutex>
#include <shared_mutex>
@@ -245,7 +246,7 @@ private:
// which can avoid the merging cost in read stage, and accelerate the
aggregation
// query performance significantly.
bool _enable_unique_key_merge_on_write = false;
- std::unique_ptr<DeleteBitmap> _delete_bitmap;
+ std::shared_ptr<DeleteBitmap> _delete_bitmap;
mutable std::shared_mutex _meta_lock;
};
@@ -276,7 +277,11 @@ public:
using BitmapKey = std::tuple<RowsetId, SegmentId, Version>;
std::map<BitmapKey, roaring::Roaring> delete_bitmap; // Ordered map
- DeleteBitmap();
+ /**
+ *
+ * @param tablet_id the tablet which this delete bitmap associates with
+ */
+ DeleteBitmap(int64_t tablet_id);
/**
* Copy c-tor for making delete bitmap snapshot on read path
@@ -357,6 +362,52 @@ public:
* @param other
*/
void merge(const DeleteBitmap& other);
+
+ /**
+ * Checks if the given row is marked deleted in bitmap with the condition:
+ * all the bitmaps that
+ * RowsetId and SegmentId are the same as the given ones,
+ * and Version <= the given Version
+ *
+ * Note: aggregation cache may be used.
+ *
+ * @return true if marked deleted
+ */
+ bool contains_agg(const BitmapKey& bitmap, uint32_t row_id) const;
+
+ /**
+ * Gets aggregated delete_bitmap on rowset_id and version, the same effect:
+ * `select sum(roaring::Roaring) where RowsetId=rowset_id and
SegmentId=seg_id and Version <= version`
+ *
+ * @return shared_ptr to a bitmap, which may be empty
+ */
+ std::shared_ptr<roaring::Roaring> get_agg(const BitmapKey& bmk) const;
+
+ class AggCache {
+ public:
+ struct Value {
+ roaring::Roaring bitmap;
+ };
+
+ AggCache(size_t size_in_bytes) {
+ static std::once_flag once;
+ std::call_once(once, [size_in_bytes] {
+ auto tmp = new ShardedLRUCache("DeleteBitmap AggCache",
size_in_bytes,
+ LRUCacheType::SIZE, 2048);
+ AggCache::s_repr.store(tmp, std::memory_order_release);
+ });
+
+ while (!s_repr.load(std::memory_order_acquire)) {
+ }
+ }
+
+ static ShardedLRUCache* repr() { return
s_repr.load(std::memory_order_acquire); }
+ static std::atomic<ShardedLRUCache*> s_repr;
+ };
+
+private:
+ mutable std::shared_ptr<AggCache> _agg_cache;
+ int64_t _tablet_id;
};
static const std::string SEQUENCE_COL = "__DORIS_SEQUENCE_COL__";
diff --git a/be/test/olap/tablet_meta_test.cpp
b/be/test/olap/tablet_meta_test.cpp
index 28e8654b56..f8b83329ec 100644
--- a/be/test/olap/tablet_meta_test.cpp
+++ b/be/test/olap/tablet_meta_test.cpp
@@ -42,7 +42,7 @@ TEST(TabletMetaTest, SaveAndParse) {
}
TEST(TabletMetaTest, TestDeleteBitmap) {
- std::unique_ptr<DeleteBitmap> dbmp(new DeleteBitmap());
+ std::unique_ptr<DeleteBitmap> dbmp(new DeleteBitmap(10086));
auto gen1 = [&dbmp](int64_t max_rst_id, uint32_t max_seg_id, uint32_t
max_row) {
for (int64_t i = 0; i < max_rst_id; ++i) {
for (uint32_t j = 0; j < max_seg_id; ++j) {
@@ -96,7 +96,7 @@ TEST(TabletMetaTest, TestDeleteBitmap) {
}
{
- DeleteBitmap db_upper;
+ DeleteBitmap db_upper(10086);
dbmp->subset({RowsetId {2, 0, 1, 1}, 1, 0}, {RowsetId {2, 0, 1, 1},
1000000, 0}, &db_upper);
roaring::Roaring d;
ASSERT_EQ(db_upper.get({RowsetId {2, 0, 1, 1}, 1, 1}, &d), 0);
@@ -109,7 +109,7 @@ TEST(TabletMetaTest, TestDeleteBitmap) {
{
auto old_size = dbmp->delete_bitmap.size();
// test merge
- DeleteBitmap other;
+ DeleteBitmap other(10086);
other.add({RowsetId {2, 0, 1, 1}, 1, 1}, 1100);
dbmp->merge(other);
ASSERT_EQ(dbmp->delete_bitmap.size(), old_size);
@@ -118,6 +118,55 @@ TEST(TabletMetaTest, TestDeleteBitmap) {
dbmp->merge(other);
ASSERT_EQ(dbmp->delete_bitmap.size(), old_size + 2);
}
+
+
////////////////////////////////////////////////////////////////////////////
+ // Cache test
+
////////////////////////////////////////////////////////////////////////////
+ // Aggregation bitmap contains all row ids that are in versions smaller or
+ {
+ // equal to the given version, boundary test
+ auto bm = dbmp->get_agg({RowsetId {2, 0, 1, 1}, 1, 2});
+ ASSERT_EQ(bm->cardinality(), 1005);
+ ASSERT_TRUE(bm->contains(999));
+ ASSERT_TRUE(bm->contains(1100));
+ ASSERT_TRUE(bm->contains(1101));
+ ASSERT_TRUE(bm->contains(1102));
+ ASSERT_TRUE(bm->contains(1103));
+ ASSERT_TRUE(bm->contains(1104));
+ bm = dbmp->get_agg({RowsetId {2, 0, 1, 1}, 1, 2});
+ ASSERT_EQ(bm->cardinality(), 1005);
+ }
+
+ // Aggregation bitmap contains all row ids that are in versions smaller or
+ // equal to the given version, normal test
+ {
+ auto bm = dbmp->get_agg({RowsetId {2, 0, 1, 1}, 1, 1000});
+ ASSERT_EQ(bm->cardinality(), 1005);
+ ASSERT_TRUE(bm->contains(999));
+ ASSERT_TRUE(bm->contains(1100));
+ ASSERT_TRUE(bm->contains(1101));
+ ASSERT_TRUE(bm->contains(1102));
+ ASSERT_TRUE(bm->contains(1103));
+ ASSERT_TRUE(bm->contains(1104));
+ bm = dbmp->get_agg({RowsetId {2, 0, 1, 1}, 1, 1000});
+ ASSERT_EQ(bm->cardinality(), 1005);
+ }
+
+ // Check data is not messed-up
+ ASSERT_TRUE(dbmp->contains({RowsetId {2, 0, 1, 1}, 1, 2}, 1104));
+ ASSERT_FALSE(dbmp->contains({RowsetId {2, 0, 1, 1}, 1, 2}, 1103));
+ ASSERT_TRUE(dbmp->contains_agg({RowsetId {2, 0, 1, 1}, 1, 2}, 1104));
+ ASSERT_TRUE(dbmp->contains_agg({RowsetId {2, 0, 1, 1}, 1, 2}, 1103));
+
+ // Test c-tor of agg cache with global LRU
+ int cached_cardinality = dbmp->get_agg({RowsetId {2, 0, 1, 1}, 1,
2})->cardinality();
+ {
+ // New delete bitmap with old agg cache
+ std::unique_ptr<DeleteBitmap> dbmp(new DeleteBitmap(10086));
+ auto bm = dbmp->get_agg({RowsetId {2, 0, 1, 1}, 1, 2});
+ ASSERT_TRUE(bm->contains(1104));
+ ASSERT_EQ(bm->cardinality(), cached_cardinality);
+ }
}
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]