This is an automated email from the ASF dual-hosted git repository.
gaodayue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 414c5a8 [fix] LRUCache::prune_if may not remove all the entries
matching the predicate (#7383)
414c5a8 is described below
commit 414c5a8b5a63f517af8d31a6116e82182d4de863
Author: Dayue Gao <[email protected]>
AuthorDate: Mon Dec 13 21:09:47 2021 +0800
[fix] LRUCache::prune_if may not remove all the entries matching the
predicate (#7383)
[fix] LRUCache::prune_if may not remove all the entries matching the
predicate
Co-authored-by: gaodayue <[email protected]>
---
be/src/olap/lru_cache.cpp | 46 ++++++++++++++---------------------------
be/src/olap/lru_cache.h | 10 +++++----
be/src/olap/segment_loader.cpp | 30 +++++++++++++++------------
be/test/olap/lru_cache_test.cpp | 19 ++++++++++-------
4 files changed, 50 insertions(+), 55 deletions(-)
diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp
index 3ae16db..ca73ab1 100644
--- a/be/src/olap/lru_cache.cpp
+++ b/be/src/olap/lru_cache.cpp
@@ -367,29 +367,19 @@ void LRUCache::erase(const CacheKey& key, uint32_t hash) {
}
}
-void LRUCache::_prune_one(LRUHandle* old) {
- DCHECK(old->in_cache);
- DCHECK(old->refs == 1); // LRU list contains elements which may be evicted
- _lru_remove(old);
- _table.remove(old);
- old->in_cache = false;
- _unref(old);
- _usage -= old->total_size;
-}
-
int64_t LRUCache::prune() {
LRUHandle* to_remove_head = nullptr;
{
MutexLock l(&_mutex);
while (_lru_normal.next != &_lru_normal) {
LRUHandle* old = _lru_normal.next;
- _prune_one(old);
+ _evict_one_entry(old);
old->next = to_remove_head;
to_remove_head = old;
}
while (_lru_durable.next != &_lru_durable) {
LRUHandle* old = _lru_durable.next;
- _prune_one(old);
+ _evict_one_entry(old);
old->next = to_remove_head;
to_remove_head = old;
}
@@ -404,34 +394,30 @@ int64_t LRUCache::prune() {
return pruned_count;
}
-int64_t LRUCache::prune_if(bool (*pred)(const void* value)) {
+int64_t LRUCache::prune_if(CacheValuePredicate pred) {
LRUHandle* to_remove_head = nullptr;
{
MutexLock l(&_mutex);
LRUHandle* p = _lru_normal.next;
while (p != &_lru_normal) {
- LRUHandle* old = _lru_normal.next;
- if (pred(old->value)) {
- _prune_one(old);
- old->next = to_remove_head;
- to_remove_head = old;
- p = _lru_normal.next;
- } else {
- p = p->next;
+ LRUHandle* next = p->next;
+ if (pred(p->value)) {
+ _evict_one_entry(p);
+ p->next = to_remove_head;
+ to_remove_head = p;
}
+ p = next;
}
p = _lru_durable.next;
while (p != &_lru_durable) {
- LRUHandle* old = _lru_durable.next;
- if (pred(old->value)) {
- _prune_one(old);
- old->next = to_remove_head;
- to_remove_head = old;
- p = _lru_durable.next;
- } else {
- p = p->next;
+ LRUHandle* next = p->next;
+ if (pred(p->value)) {
+ _evict_one_entry(p);
+ p->next = to_remove_head;
+ to_remove_head = p;
}
+ p = next;
}
}
int64_t pruned_count = 0;
@@ -527,7 +513,7 @@ int64_t ShardedLRUCache::prune() {
return num_prune;
}
-int64_t ShardedLRUCache::prune_if(bool (*pred)(const void* value)) {
+int64_t ShardedLRUCache::prune_if(CacheValuePredicate pred) {
int64_t num_prune = 0;
for (int s = 0; s < kNumShards; s++) {
num_prune += _shards[s]->prune_if(pred);
diff --git a/be/src/olap/lru_cache.h b/be/src/olap/lru_cache.h
index b9f71f1..2ea6bda 100644
--- a/be/src/olap/lru_cache.h
+++ b/be/src/olap/lru_cache.h
@@ -10,6 +10,7 @@
#include <stdint.h>
#include <string.h>
+#include <functional>
#include <string>
#include <vector>
@@ -148,6 +149,8 @@ private:
// The entry with smaller CachePriority will evict firstly
enum class CachePriority { NORMAL = 0, DURABLE = 1 };
+using CacheValuePredicate = std::function<bool(const void*)>;
+
class Cache {
public:
Cache() {}
@@ -216,7 +219,7 @@ public:
// Same as prune(), but the entry will only be pruned if the predicate
matched.
// NOTICE: the predicate should be simple enough, or the prune_if()
function
// may hold lock for a long time to execute predicate.
- virtual int64_t prune_if(bool (*pred)(const void* value)) { return 0; }
+ virtual int64_t prune_if(CacheValuePredicate pred) { return 0; }
private:
DISALLOW_COPY_AND_ASSIGN(Cache);
@@ -317,7 +320,7 @@ public:
void release(Cache::Handle* handle);
void erase(const CacheKey& key, uint32_t hash);
int64_t prune();
- int64_t prune_if(bool (*pred)(const void* value));
+ int64_t prune_if(CacheValuePredicate pred);
uint64_t get_lookup_count() const { return _lookup_count; }
uint64_t get_hit_count() const { return _hit_count; }
@@ -330,7 +333,6 @@ private:
bool _unref(LRUHandle* e);
void _evict_from_lru(size_t total_size, LRUHandle** to_remove_head);
void _evict_one_entry(LRUHandle* e);
- void _prune_one(LRUHandle* old);
private:
LRUCacheType _type;
@@ -374,7 +376,7 @@ public:
Slice value_slice(Handle* handle) override;
virtual uint64_t new_id();
virtual int64_t prune();
- virtual int64_t prune_if(bool (*pred)(const void* value));
+ virtual int64_t prune_if(CacheValuePredicate pred);
private:
void update_cache_metrics() const;
diff --git a/be/src/olap/segment_loader.cpp b/be/src/olap/segment_loader.cpp
index 872279f..198b4b4 100644
--- a/be/src/olap/segment_loader.cpp
+++ b/be/src/olap/segment_loader.cpp
@@ -31,8 +31,10 @@ void SegmentLoader::create_global_instance(size_t capacity) {
}
SegmentLoader::SegmentLoader(size_t capacity)
- : _mem_tracker(MemTracker::CreateTracker(capacity, "SegmentLoader",
nullptr, true, true, MemTrackerLevel::OVERVIEW)) {
- _cache = std::unique_ptr<Cache>(new_typed_lru_cache("SegmentCache",
capacity, LRUCacheType::NUMBER, _mem_tracker));
+ : _mem_tracker(MemTracker::CreateTracker(capacity, "SegmentLoader",
nullptr, true, true,
+ MemTrackerLevel::OVERVIEW)) {
+ _cache = std::unique_ptr<Cache>(
+ new_typed_lru_cache("SegmentCache", capacity,
LRUCacheType::NUMBER, _mem_tracker));
}
bool SegmentLoader::_lookup(const SegmentLoader::CacheKey& key,
SegmentCacheHandle* handle) {
@@ -44,20 +46,21 @@ bool SegmentLoader::_lookup(const SegmentLoader::CacheKey&
key, SegmentCacheHand
return true;
}
-void SegmentLoader::_insert(const SegmentLoader::CacheKey& key,
SegmentLoader::CacheValue& value, SegmentCacheHandle* handle) {
+void SegmentLoader::_insert(const SegmentLoader::CacheKey& key,
SegmentLoader::CacheValue& value,
+ SegmentCacheHandle* handle) {
auto deleter = [](const doris::CacheKey& key, void* value) {
- SegmentLoader::CacheValue* cache_value = (SegmentLoader::CacheValue*)
value;
+ SegmentLoader::CacheValue* cache_value =
(SegmentLoader::CacheValue*)value;
cache_value->segments.clear();
delete cache_value;
};
- auto lru_handle = _cache->insert(key.encode(), &value,
sizeof(SegmentLoader::CacheValue), deleter, CachePriority::NORMAL);
+ auto lru_handle = _cache->insert(key.encode(), &value,
sizeof(SegmentLoader::CacheValue),
+ deleter, CachePriority::NORMAL);
*handle = SegmentCacheHandle(_cache.get(), lru_handle);
}
OLAPStatus SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset,
- SegmentCacheHandle* cache_handle,
- bool use_cache) {
+ SegmentCacheHandle* cache_handle, bool
use_cache) {
SegmentLoader::CacheKey cache_key(rowset->rowset_id());
if (_lookup(cache_key, cache_handle)) {
cache_handle->owned = false;
@@ -81,17 +84,18 @@ OLAPStatus SegmentLoader::load_segments(const
BetaRowsetSharedPtr& rowset,
}
OLAPStatus SegmentLoader::prune() {
- bool (*pred)(const void* value) = [](const void* value) -> bool {
- int64_t curtime = UnixMillis();
- SegmentLoader::CacheValue* cache_value = (SegmentLoader::CacheValue*)
value;
- return curtime - cache_value->last_visit_time >
config::tablet_rowset_stale_sweep_time_sec * 1000;
+ const int64_t curtime = UnixMillis();
+ auto pred = [curtime](const void* value) -> bool {
+ SegmentLoader::CacheValue* cache_value =
(SegmentLoader::CacheValue*)value;
+ return (cache_value->last_visit_time +
config::tablet_rowset_stale_sweep_time_sec * 1000) <
+ curtime;
};
MonotonicStopWatch watch;
watch.start();
int64_t prune_num = _cache->prune_if(pred);
- LOG(INFO) << "prune " << prune_num << " entries in segment cache.
cost(ms): "
- << watch.elapsed_time() / 1000 / 1000;
+ LOG(INFO) << "prune " << prune_num
+ << " entries in segment cache. cost(ms): " <<
watch.elapsed_time() / 1000 / 1000;
return OLAP_SUCCESS;
}
diff --git a/be/test/olap/lru_cache_test.cpp b/be/test/olap/lru_cache_test.cpp
index e993439..d71fdeb 100644
--- a/be/test/olap/lru_cache_test.cpp
+++ b/be/test/olap/lru_cache_test.cpp
@@ -95,7 +95,7 @@ public:
void Insert(int key, int value, int charge) {
std::string result;
_cache->release(_cache->insert(EncodeKey(&result, key),
EncodeValue(value), charge,
- &CacheTest::Deleter));
+ &CacheTest::Deleter));
}
void InsertDurable(int key, int value, int charge) {
@@ -298,20 +298,23 @@ TEST_F(CacheTest, Prune) {
insert_LRUCache(cache, key7, 700, CachePriority::DURABLE);
ASSERT_EQ(5, cache.get_usage());
- auto pred = [](const void* value) -> bool {
- return false;
- };
+ auto pred = [](const void* value) -> bool { return false; };
cache.prune_if(pred);
ASSERT_EQ(5, cache.get_usage());
- auto pred2 = [](const void* value) -> bool {
- return true;
- };
+ auto pred2 = [](const void* value) -> bool { return
DecodeValue((void*)value) > 400; };
cache.prune_if(pred2);
- ASSERT_EQ(0, cache.get_usage());
+ ASSERT_EQ(2, cache.get_usage());
cache.prune();
ASSERT_EQ(0, cache.get_usage());
+
+ for (int i = 1; i <= 5; ++i) {
+ insert_LRUCache(cache, CacheKey {std::to_string(i)}, i,
CachePriority::NORMAL);
+ ASSERT_EQ(i, cache.get_usage());
+ }
+ cache.prune_if([](const void*) { return true; });
+ ASSERT_EQ(0, cache.get_usage());
}
TEST_F(CacheTest, HeavyEntries) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]