This is an automated email from the ASF dual-hosted git repository.

zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a1758bd139 [feature-wip](unique-key-merge-on-write) Add agg cache for 
delete bitmap DSIP-018 (#10921)
a1758bd139 is described below

commit a1758bd1394a4031a97d09770b0c938867404300
Author: Compilation Success <[email protected]>
AuthorDate: Thu Jul 21 12:48:44 2022 +0800

    [feature-wip](unique-key-merge-on-write) Add agg cache for delete bitmap 
DSIP-018 (#10921)
    
    Use global LRU for delete bitmap cache
---
 be/src/common/config.h            |  3 ++
 be/src/olap/tablet_meta.cpp       | 79 +++++++++++++++++++++++++++++++++++++--
 be/src/olap/tablet_meta.h         | 55 ++++++++++++++++++++++++++-
 be/test/olap/tablet_meta_test.cpp | 55 +++++++++++++++++++++++++--
 4 files changed, 183 insertions(+), 9 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index d0fb58de5a..12e35d2574 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -708,6 +708,9 @@ CONF_mInt32(external_table_connect_timeout_sec, "5");
 // So the value of this config should corresponding to the number of rowsets 
on this BE.
 CONF_mInt32(segment_cache_capacity, "1000000");
 
+// Global bitmap cache capacity for aggregation cache, size in bytes
+CONF_Int64(delete_bitmap_agg_cache_capacity, "104857600");
+
 // s3 config
 CONF_mInt32(max_remote_storage_count, "10");
 
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index ad4d4b104c..18dd73f8bc 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -51,7 +51,9 @@ Status TabletMeta::create(const TCreateTabletReq& request, 
const TabletUid& tabl
 }
 
 TabletMeta::TabletMeta()
-        : _tablet_uid(0, 0), _schema(new TabletSchema), _delete_bitmap(new 
DeleteBitmap()) {}
+        : _tablet_uid(0, 0),
+          _schema(new TabletSchema),
+          _delete_bitmap(new DeleteBitmap(_tablet_id)) {}
 
 TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t 
tablet_id,
                        int64_t replica_id, int32_t schema_hash, uint64_t 
shard_id,
@@ -60,7 +62,9 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t 
partition_id, int64_t tablet_id
                        TabletUid tablet_uid, TTabletType::type tabletType,
                        TCompressionType::type compression_type, const 
std::string& storage_policy,
                        bool enable_unique_key_merge_on_write)
-        : _tablet_uid(0, 0), _schema(new TabletSchema), _delete_bitmap(new 
DeleteBitmap()) {
+        : _tablet_uid(0, 0),
+          _schema(new TabletSchema),
+          _delete_bitmap(new DeleteBitmap(tablet_id)) {
     TabletMetaPB tablet_meta_pb;
     tablet_meta_pb.set_table_id(table_id);
     tablet_meta_pb.set_partition_id(partition_id);
@@ -203,7 +207,7 @@ TabletMeta::TabletMeta(const TabletMeta& b)
           _in_restore_mode(b._in_restore_mode),
           _preferred_rowset_type(b._preferred_rowset_type),
           _cooldown_resource(b._cooldown_resource),
-          _delete_bitmap(new DeleteBitmap(*b._delete_bitmap)) {};
+          _delete_bitmap(b._delete_bitmap) {};
 
 void TabletMeta::init_column_from_tcolumn(uint32_t unique_id, const TColumn& 
tcolumn,
                                           ColumnPB* column) {
@@ -757,23 +761,29 @@ bool operator!=(const TabletMeta& a, const TabletMeta& b) 
{
     return !(a == b);
 }
 
-DeleteBitmap::DeleteBitmap() {}
+DeleteBitmap::DeleteBitmap(int64_t tablet_id) : _tablet_id(tablet_id) {
+    _agg_cache.reset(new AggCache(config::delete_bitmap_agg_cache_capacity));
+}
 
 DeleteBitmap::DeleteBitmap(const DeleteBitmap& o) {
     delete_bitmap = o.delete_bitmap; // just copy data
+    _tablet_id = o._tablet_id;
 }
 
 DeleteBitmap& DeleteBitmap::operator=(const DeleteBitmap& o) {
     delete_bitmap = o.delete_bitmap; // just copy data
+    _tablet_id = o._tablet_id;
     return *this;
 }
 
 DeleteBitmap::DeleteBitmap(DeleteBitmap&& o) {
     delete_bitmap = std::move(o.delete_bitmap);
+    _tablet_id = o._tablet_id;
 }
 
 DeleteBitmap& DeleteBitmap::operator=(DeleteBitmap&& o) {
     delete_bitmap = std::move(o.delete_bitmap);
+    _tablet_id = o._tablet_id;
     return *this;
 }
 
@@ -812,6 +822,10 @@ bool DeleteBitmap::contains(const BitmapKey& bmk, uint32_t 
row_id) const {
     return it != delete_bitmap.end() && it->second.contains(row_id);
 }
 
+bool DeleteBitmap::contains_agg(const BitmapKey& bmk, uint32_t row_id) const {
+    return get_agg(bmk)->contains(row_id);
+}
+
 int DeleteBitmap::set(const BitmapKey& bmk, const roaring::Roaring& 
segment_delete_bitmap) {
     std::lock_guard l(lock);
     auto [_, inserted] = delete_bitmap.insert_or_assign(bmk, 
segment_delete_bitmap);
@@ -855,4 +869,61 @@ void DeleteBitmap::merge(const DeleteBitmap& other) {
     }
 }
 
+// We cannot just copy the underlying memory to construct a string
+// due to equivalent objects may have different padding bytes.
+// Reading padding bytes is undefined behavior, neither copy nor
+// placement new will help simplify the code.
+// Refer to C11 standards §6.2.6.1/6 and §6.7.9/21 for more info.
+static std::string agg_cache_key(int64_t tablet_id, const 
DeleteBitmap::BitmapKey& bmk) {
+    std::string ret(sizeof(tablet_id) + sizeof(bmk), '\0');
+    *reinterpret_cast<int64_t*>(ret.data()) = tablet_id;
+    auto t = reinterpret_cast<DeleteBitmap::BitmapKey*>(ret.data() + 
sizeof(tablet_id));
+    std::get<RowsetId>(*t).version = std::get<RowsetId>(bmk).version;
+    std::get<RowsetId>(*t).hi = std::get<RowsetId>(bmk).hi;
+    std::get<RowsetId>(*t).mi = std::get<RowsetId>(bmk).mi;
+    std::get<RowsetId>(*t).lo = std::get<RowsetId>(bmk).lo;
+    std::get<1>(*t) = std::get<1>(bmk);
+    std::get<2>(*t) = std::get<2>(bmk);
+    return ret;
+}
+
+std::shared_ptr<roaring::Roaring> DeleteBitmap::get_agg(const BitmapKey& bmk) 
const {
+    std::string key_str = agg_cache_key(_tablet_id, bmk); // Cache key 
container
+    CacheKey key(key_str);
+    Cache::Handle* handle = _agg_cache->repr()->lookup(key);
+
+    AggCache::Value* val =
+            handle == nullptr
+                    ? nullptr
+                    : 
reinterpret_cast<AggCache::Value*>(_agg_cache->repr()->value(handle));
+    // FIXME: do we need a mutex here to get rid of duplicated initializations
+    //        of cache entries in some cases?
+    if (val == nullptr) { // Renew if needed, put a new Value to cache
+        val = new AggCache::Value();
+        {
+            std::shared_lock l(lock);
+            DeleteBitmap::BitmapKey start {std::get<0>(bmk), std::get<1>(bmk), 
0};
+            for (auto it = delete_bitmap.lower_bound(start); it != 
delete_bitmap.end(); ++it) {
+                auto& [k, bm] = *it;
+                if (std::get<0>(k) != std::get<0>(bmk) || std::get<1>(k) != 
std::get<1>(bmk) ||
+                    std::get<2>(k) > std::get<2>(bmk)) {
+                    break;
+                }
+                val->bitmap |= bm;
+            }
+        }
+        static auto deleter = [](const CacheKey& key, void* value) {
+            delete (AggCache::Value*)value; // Just delete to reclaim
+        };
+        size_t charge = val->bitmap.getSizeInBytes() + sizeof(AggCache::Value);
+        handle = _agg_cache->repr()->insert(key, val, charge, deleter, 
CachePriority::NORMAL);
+    }
+
+    // It is natural for the cache to reclaim the underlying memory
+    return std::shared_ptr<roaring::Roaring>(
+            &val->bitmap, [this, handle](...) { 
_agg_cache->repr()->release(handle); });
+}
+
+std::atomic<ShardedLRUCache*> DeleteBitmap::AggCache::s_repr {nullptr};
+
 } // namespace doris
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 60713fa977..7275d0430d 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <atomic>
 #include <cstddef>
 #include <mutex>
 #include <shared_mutex>
@@ -245,7 +246,7 @@ private:
     // which can avoid the merging cost in read stage, and accelerate the 
aggregation
     // query performance significantly.
     bool _enable_unique_key_merge_on_write = false;
-    std::unique_ptr<DeleteBitmap> _delete_bitmap;
+    std::shared_ptr<DeleteBitmap> _delete_bitmap;
 
     mutable std::shared_mutex _meta_lock;
 };
@@ -276,7 +277,11 @@ public:
     using BitmapKey = std::tuple<RowsetId, SegmentId, Version>;
     std::map<BitmapKey, roaring::Roaring> delete_bitmap; // Ordered map
 
-    DeleteBitmap();
+    /**
+     * 
+     * @param tablet_id the tablet which this delete bitmap associates with
+     */
+    DeleteBitmap(int64_t tablet_id);
 
     /**
      * Copy c-tor for making delete bitmap snapshot on read path
@@ -357,6 +362,52 @@ public:
      * @param other
      */
     void merge(const DeleteBitmap& other);
+
+    /**
+     * Checks if the given row is marked deleted in bitmap with the condition:
+     * all the bitmaps that
+     * RowsetId and SegmentId are the same as the given ones,
+     * and Version <= the given Version
+     *
+     * Note: aggregation cache may be used.
+     *
+     * @return true if marked deleted
+     */
+    bool contains_agg(const BitmapKey& bitmap, uint32_t row_id) const;
+
+    /**
+     * Gets aggregated delete_bitmap on rowset_id and version, the same effect:
+     * `select sum(roaring::Roaring) where RowsetId=rowset_id and 
SegmentId=seg_id and Version <= version`
+     *
+     * @return shared_ptr to a bitmap, which may be empty
+     */
+    std::shared_ptr<roaring::Roaring> get_agg(const BitmapKey& bmk) const;
+
+    class AggCache {
+    public:
+        struct Value {
+            roaring::Roaring bitmap;
+        };
+
+        AggCache(size_t size_in_bytes) {
+            static std::once_flag once;
+            std::call_once(once, [size_in_bytes] {
+                auto tmp = new ShardedLRUCache("DeleteBitmap AggCache", 
size_in_bytes,
+                                               LRUCacheType::SIZE, 2048);
+                AggCache::s_repr.store(tmp, std::memory_order_release);
+            });
+
+            while (!s_repr.load(std::memory_order_acquire)) {
+            }
+        }
+
+        static ShardedLRUCache* repr() { return 
s_repr.load(std::memory_order_acquire); }
+        static std::atomic<ShardedLRUCache*> s_repr;
+    };
+
+private:
+    mutable std::shared_ptr<AggCache> _agg_cache;
+    int64_t _tablet_id;
 };
 
 static const std::string SEQUENCE_COL = "__DORIS_SEQUENCE_COL__";
diff --git a/be/test/olap/tablet_meta_test.cpp 
b/be/test/olap/tablet_meta_test.cpp
index 28e8654b56..f8b83329ec 100644
--- a/be/test/olap/tablet_meta_test.cpp
+++ b/be/test/olap/tablet_meta_test.cpp
@@ -42,7 +42,7 @@ TEST(TabletMetaTest, SaveAndParse) {
 }
 
 TEST(TabletMetaTest, TestDeleteBitmap) {
-    std::unique_ptr<DeleteBitmap> dbmp(new DeleteBitmap());
+    std::unique_ptr<DeleteBitmap> dbmp(new DeleteBitmap(10086));
     auto gen1 = [&dbmp](int64_t max_rst_id, uint32_t max_seg_id, uint32_t 
max_row) {
         for (int64_t i = 0; i < max_rst_id; ++i) {
             for (uint32_t j = 0; j < max_seg_id; ++j) {
@@ -96,7 +96,7 @@ TEST(TabletMetaTest, TestDeleteBitmap) {
     }
 
     {
-        DeleteBitmap db_upper;
+        DeleteBitmap db_upper(10086);
         dbmp->subset({RowsetId {2, 0, 1, 1}, 1, 0}, {RowsetId {2, 0, 1, 1}, 
1000000, 0}, &db_upper);
         roaring::Roaring d;
         ASSERT_EQ(db_upper.get({RowsetId {2, 0, 1, 1}, 1, 1}, &d), 0);
@@ -109,7 +109,7 @@ TEST(TabletMetaTest, TestDeleteBitmap) {
     {
         auto old_size = dbmp->delete_bitmap.size();
         // test merge
-        DeleteBitmap other;
+        DeleteBitmap other(10086);
         other.add({RowsetId {2, 0, 1, 1}, 1, 1}, 1100);
         dbmp->merge(other);
         ASSERT_EQ(dbmp->delete_bitmap.size(), old_size);
@@ -118,6 +118,55 @@ TEST(TabletMetaTest, TestDeleteBitmap) {
         dbmp->merge(other);
         ASSERT_EQ(dbmp->delete_bitmap.size(), old_size + 2);
     }
+
+    
////////////////////////////////////////////////////////////////////////////
+    // Cache test
+    
////////////////////////////////////////////////////////////////////////////
+    // Aggregation bitmap contains all row ids that are in versions smaller or
+    {
+        // equal to the given version, boundary test
+        auto bm = dbmp->get_agg({RowsetId {2, 0, 1, 1}, 1, 2});
+        ASSERT_EQ(bm->cardinality(), 1005);
+        ASSERT_TRUE(bm->contains(999));
+        ASSERT_TRUE(bm->contains(1100));
+        ASSERT_TRUE(bm->contains(1101));
+        ASSERT_TRUE(bm->contains(1102));
+        ASSERT_TRUE(bm->contains(1103));
+        ASSERT_TRUE(bm->contains(1104));
+        bm = dbmp->get_agg({RowsetId {2, 0, 1, 1}, 1, 2});
+        ASSERT_EQ(bm->cardinality(), 1005);
+    }
+
+    // Aggregation bitmap contains all row ids that are in versions smaller or
+    // equal to the given version, normal test
+    {
+        auto bm = dbmp->get_agg({RowsetId {2, 0, 1, 1}, 1, 1000});
+        ASSERT_EQ(bm->cardinality(), 1005);
+        ASSERT_TRUE(bm->contains(999));
+        ASSERT_TRUE(bm->contains(1100));
+        ASSERT_TRUE(bm->contains(1101));
+        ASSERT_TRUE(bm->contains(1102));
+        ASSERT_TRUE(bm->contains(1103));
+        ASSERT_TRUE(bm->contains(1104));
+        bm = dbmp->get_agg({RowsetId {2, 0, 1, 1}, 1, 1000});
+        ASSERT_EQ(bm->cardinality(), 1005);
+    }
+
+    // Check data is not messed-up
+    ASSERT_TRUE(dbmp->contains({RowsetId {2, 0, 1, 1}, 1, 2}, 1104));
+    ASSERT_FALSE(dbmp->contains({RowsetId {2, 0, 1, 1}, 1, 2}, 1103));
+    ASSERT_TRUE(dbmp->contains_agg({RowsetId {2, 0, 1, 1}, 1, 2}, 1104));
+    ASSERT_TRUE(dbmp->contains_agg({RowsetId {2, 0, 1, 1}, 1, 2}, 1103));
+
+    // Test c-tor of agg cache with global LRU
+    int cached_cardinality = dbmp->get_agg({RowsetId {2, 0, 1, 1}, 1, 
2})->cardinality();
+    {
+        // New delete bitmap with old agg cache
+        std::unique_ptr<DeleteBitmap> dbmp(new DeleteBitmap(10086));
+        auto bm = dbmp->get_agg({RowsetId {2, 0, 1, 1}, 1, 2});
+        ASSERT_TRUE(bm->contains(1104));
+        ASSERT_EQ(bm->cardinality(), cached_cardinality);
+    }
 }
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to