This is an automated email from the ASF dual-hosted git repository.

gaodayue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 414c5a8  [fix] LRUCache::prune_if may not remove all the entries 
matching the predicate (#7383)
414c5a8 is described below

commit 414c5a8b5a63f517af8d31a6116e82182d4de863
Author: Dayue Gao <[email protected]>
AuthorDate: Mon Dec 13 21:09:47 2021 +0800

    [fix] LRUCache::prune_if may not remove all the entries matching the 
predicate (#7383)
    
    [fix] LRUCache::prune_if may not remove all the entries matching the 
predicate
    
    Co-authored-by: gaodayue <[email protected]>
---
 be/src/olap/lru_cache.cpp       | 46 ++++++++++++++---------------------------
 be/src/olap/lru_cache.h         | 10 +++++----
 be/src/olap/segment_loader.cpp  | 30 +++++++++++++++------------
 be/test/olap/lru_cache_test.cpp | 19 ++++++++++-------
 4 files changed, 50 insertions(+), 55 deletions(-)

diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp
index 3ae16db..ca73ab1 100644
--- a/be/src/olap/lru_cache.cpp
+++ b/be/src/olap/lru_cache.cpp
@@ -367,29 +367,19 @@ void LRUCache::erase(const CacheKey& key, uint32_t hash) {
     }
 }
 
-void LRUCache::_prune_one(LRUHandle* old) {
-    DCHECK(old->in_cache);
-    DCHECK(old->refs == 1); // LRU list contains elements which may be evicted
-    _lru_remove(old);
-    _table.remove(old);
-    old->in_cache = false;
-    _unref(old);
-    _usage -= old->total_size;
-}
-
 int64_t LRUCache::prune() {
     LRUHandle* to_remove_head = nullptr;
     {
         MutexLock l(&_mutex);
         while (_lru_normal.next != &_lru_normal) {
             LRUHandle* old = _lru_normal.next;
-            _prune_one(old);
+            _evict_one_entry(old);
             old->next = to_remove_head;
             to_remove_head = old;
         }
         while (_lru_durable.next != &_lru_durable) {
             LRUHandle* old = _lru_durable.next;
-            _prune_one(old);
+            _evict_one_entry(old);
             old->next = to_remove_head;
             to_remove_head = old;
         }
@@ -404,34 +394,30 @@ int64_t LRUCache::prune() {
     return pruned_count;
 }
 
-int64_t LRUCache::prune_if(bool (*pred)(const void* value)) {
+int64_t LRUCache::prune_if(CacheValuePredicate pred) {
     LRUHandle* to_remove_head = nullptr;
     {
         MutexLock l(&_mutex);
         LRUHandle* p = _lru_normal.next;
         while (p != &_lru_normal) {
-            LRUHandle* old = _lru_normal.next;
-            if (pred(old->value)) {
-                _prune_one(old);
-                old->next = to_remove_head;
-                to_remove_head = old;
-                p = _lru_normal.next;
-            } else {
-                p = p->next;
+            LRUHandle* next = p->next;
+            if (pred(p->value)) {
+                _evict_one_entry(p);
+                p->next = to_remove_head;
+                to_remove_head = p;
             }
+            p = next;
         }
 
         p = _lru_durable.next;
         while (p != &_lru_durable) {
-            LRUHandle* old = _lru_durable.next;
-            if (pred(old->value)) {
-                _prune_one(old);
-                old->next = to_remove_head;
-                to_remove_head = old;
-                p = _lru_durable.next;
-            } else {
-                p = p->next;
+            LRUHandle* next = p->next;
+            if (pred(p->value)) {
+                _evict_one_entry(p);
+                p->next = to_remove_head;
+                to_remove_head = p;
             }
+            p = next;
         }
     }
     int64_t pruned_count = 0;
@@ -527,7 +513,7 @@ int64_t ShardedLRUCache::prune() {
     return num_prune;
 }
 
-int64_t ShardedLRUCache::prune_if(bool (*pred)(const void* value)) {
+int64_t ShardedLRUCache::prune_if(CacheValuePredicate pred) {
     int64_t num_prune = 0;
     for (int s = 0; s < kNumShards; s++) {
         num_prune += _shards[s]->prune_if(pred);
diff --git a/be/src/olap/lru_cache.h b/be/src/olap/lru_cache.h
index b9f71f1..2ea6bda 100644
--- a/be/src/olap/lru_cache.h
+++ b/be/src/olap/lru_cache.h
@@ -10,6 +10,7 @@
 #include <stdint.h>
 #include <string.h>
 
+#include <functional>
 #include <string>
 #include <vector>
 
@@ -148,6 +149,8 @@ private:
 // The entry with smaller CachePriority will evict firstly
 enum class CachePriority { NORMAL = 0, DURABLE = 1 };
 
+using CacheValuePredicate = std::function<bool(const void*)>;
+
 class Cache {
 public:
     Cache() {}
@@ -216,7 +219,7 @@ public:
     // Same as prune(), but the entry will only be pruned if the predicate 
matched.
     // NOTICE: the predicate should be simple enough, or the prune_if() 
function
     // may hold lock for a long time to execute predicate.
-    virtual int64_t prune_if(bool (*pred)(const void* value)) { return 0; }
+    virtual int64_t prune_if(CacheValuePredicate pred) { return 0; }
 
 private:
     DISALLOW_COPY_AND_ASSIGN(Cache);
@@ -317,7 +320,7 @@ public:
     void release(Cache::Handle* handle);
     void erase(const CacheKey& key, uint32_t hash);
     int64_t prune();
-    int64_t prune_if(bool (*pred)(const void* value));
+    int64_t prune_if(CacheValuePredicate pred);
 
     uint64_t get_lookup_count() const { return _lookup_count; }
     uint64_t get_hit_count() const { return _hit_count; }
@@ -330,7 +333,6 @@ private:
     bool _unref(LRUHandle* e);
     void _evict_from_lru(size_t total_size, LRUHandle** to_remove_head);
     void _evict_one_entry(LRUHandle* e);
-    void _prune_one(LRUHandle* old);
 
 private:
     LRUCacheType _type;
@@ -374,7 +376,7 @@ public:
     Slice value_slice(Handle* handle) override;
     virtual uint64_t new_id();
     virtual int64_t prune();
-    virtual int64_t prune_if(bool (*pred)(const void* value));
+    virtual int64_t prune_if(CacheValuePredicate pred);
 
 private:
     void update_cache_metrics() const;
diff --git a/be/src/olap/segment_loader.cpp b/be/src/olap/segment_loader.cpp
index 872279f..198b4b4 100644
--- a/be/src/olap/segment_loader.cpp
+++ b/be/src/olap/segment_loader.cpp
@@ -31,8 +31,10 @@ void SegmentLoader::create_global_instance(size_t capacity) {
 }
 
 SegmentLoader::SegmentLoader(size_t capacity)
-    : _mem_tracker(MemTracker::CreateTracker(capacity, "SegmentLoader", 
nullptr, true, true, MemTrackerLevel::OVERVIEW)) {
-        _cache = std::unique_ptr<Cache>(new_typed_lru_cache("SegmentCache", 
capacity, LRUCacheType::NUMBER, _mem_tracker));
+        : _mem_tracker(MemTracker::CreateTracker(capacity, "SegmentLoader", 
nullptr, true, true,
+                                                 MemTrackerLevel::OVERVIEW)) {
+    _cache = std::unique_ptr<Cache>(
+            new_typed_lru_cache("SegmentCache", capacity, 
LRUCacheType::NUMBER, _mem_tracker));
 }
 
 bool SegmentLoader::_lookup(const SegmentLoader::CacheKey& key, 
SegmentCacheHandle* handle) {
@@ -44,20 +46,21 @@ bool SegmentLoader::_lookup(const SegmentLoader::CacheKey& 
key, SegmentCacheHand
     return true;
 }
 
-void SegmentLoader::_insert(const SegmentLoader::CacheKey& key, 
SegmentLoader::CacheValue& value, SegmentCacheHandle* handle) {
+void SegmentLoader::_insert(const SegmentLoader::CacheKey& key, 
SegmentLoader::CacheValue& value,
+                            SegmentCacheHandle* handle) {
     auto deleter = [](const doris::CacheKey& key, void* value) {
-        SegmentLoader::CacheValue* cache_value = (SegmentLoader::CacheValue*) 
value;
+        SegmentLoader::CacheValue* cache_value = 
(SegmentLoader::CacheValue*)value;
         cache_value->segments.clear();
         delete cache_value;
     };
 
-    auto lru_handle = _cache->insert(key.encode(), &value, 
sizeof(SegmentLoader::CacheValue), deleter, CachePriority::NORMAL);
+    auto lru_handle = _cache->insert(key.encode(), &value, 
sizeof(SegmentLoader::CacheValue),
+                                     deleter, CachePriority::NORMAL);
     *handle = SegmentCacheHandle(_cache.get(), lru_handle);
 }
 
 OLAPStatus SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset,
-                                        SegmentCacheHandle* cache_handle,
-                                        bool use_cache) {
+                                        SegmentCacheHandle* cache_handle, bool 
use_cache) {
     SegmentLoader::CacheKey cache_key(rowset->rowset_id());
     if (_lookup(cache_key, cache_handle)) {
         cache_handle->owned = false;
@@ -81,17 +84,18 @@ OLAPStatus SegmentLoader::load_segments(const 
BetaRowsetSharedPtr& rowset,
 }
 
 OLAPStatus SegmentLoader::prune() {
-    bool (*pred)(const void* value) = [](const void* value) -> bool {
-        int64_t curtime = UnixMillis();
-        SegmentLoader::CacheValue* cache_value = (SegmentLoader::CacheValue*) 
value;
-        return curtime - cache_value->last_visit_time > 
config::tablet_rowset_stale_sweep_time_sec * 1000;
+    const int64_t curtime = UnixMillis();
+    auto pred = [curtime](const void* value) -> bool {
+        SegmentLoader::CacheValue* cache_value = 
(SegmentLoader::CacheValue*)value;
+        return (cache_value->last_visit_time + 
config::tablet_rowset_stale_sweep_time_sec * 1000) <
+               curtime;
     };
 
     MonotonicStopWatch watch;
     watch.start();
     int64_t prune_num = _cache->prune_if(pred);
-    LOG(INFO) << "prune " << prune_num << " entries in segment cache. 
cost(ms): "
-            << watch.elapsed_time() / 1000 / 1000;
+    LOG(INFO) << "prune " << prune_num
+              << " entries in segment cache. cost(ms): " << 
watch.elapsed_time() / 1000 / 1000;
     return OLAP_SUCCESS;
 }
 
diff --git a/be/test/olap/lru_cache_test.cpp b/be/test/olap/lru_cache_test.cpp
index e993439..d71fdeb 100644
--- a/be/test/olap/lru_cache_test.cpp
+++ b/be/test/olap/lru_cache_test.cpp
@@ -95,7 +95,7 @@ public:
     void Insert(int key, int value, int charge) {
         std::string result;
         _cache->release(_cache->insert(EncodeKey(&result, key), 
EncodeValue(value), charge,
-                    &CacheTest::Deleter));
+                                       &CacheTest::Deleter));
     }
 
     void InsertDurable(int key, int value, int charge) {
@@ -298,20 +298,23 @@ TEST_F(CacheTest, Prune) {
     insert_LRUCache(cache, key7, 700, CachePriority::DURABLE);
     ASSERT_EQ(5, cache.get_usage());
 
-    auto pred = [](const void* value) -> bool {
-        return false;
-    };
+    auto pred = [](const void* value) -> bool { return false; };
     cache.prune_if(pred);
     ASSERT_EQ(5, cache.get_usage());
 
-    auto pred2 = [](const void* value) -> bool {
-        return true;
-    };
+    auto pred2 = [](const void* value) -> bool { return 
DecodeValue((void*)value) > 400; };
     cache.prune_if(pred2);
-    ASSERT_EQ(0, cache.get_usage());
+    ASSERT_EQ(2, cache.get_usage());
 
     cache.prune();
     ASSERT_EQ(0, cache.get_usage());
+
+    for (int i = 1; i <= 5; ++i) {
+        insert_LRUCache(cache, CacheKey {std::to_string(i)}, i, 
CachePriority::NORMAL);
+        ASSERT_EQ(i, cache.get_usage());
+    }
+    cache.prune_if([](const void*) { return true; });
+    ASSERT_EQ(0, cache.get_usage());
 }
 
 TEST_F(CacheTest, HeavyEntries) {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to