This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a719d7a2226 [fix](memory) Fix LRU Cache of type `NUMBER` charge
(#28175)
a719d7a2226 is described below
commit a719d7a2226d2c9e5aad43d805f027d6cefd820e
Author: Xinyi Zou <[email protected]>
AuthorDate: Wed Dec 13 11:15:57 2023 +0800
[fix](memory) Fix LRU Cache of type `NUMBER` charge (#28175)
---
be/src/olap/lru_cache.cpp | 9 +++------
be/src/olap/lru_cache.h | 21 ++++++++++++++-------
be/src/olap/schema_cache.h | 4 ++--
be/src/olap/segment_loader.cpp | 5 ++---
be/src/olap/txn_manager.cpp | 8 ++++----
be/src/olap/txn_manager.h | 3 +--
be/src/runtime/load_channel_mgr.cpp | 7 ++-----
be/src/runtime/load_channel_mgr.h | 3 +--
be/src/runtime/memory/lru_cache_policy.h | 6 +++---
be/src/service/point_query_executor.cpp | 2 +-
be/src/util/obj_lru_cache.cpp | 2 +-
be/src/util/obj_lru_cache.h | 4 ++--
be/test/olap/lru_cache_test.cpp | 2 +-
13 files changed, 37 insertions(+), 39 deletions(-)
diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp
index a79e6fbf966..0de99e20b20 100644
--- a/be/src/olap/lru_cache.cpp
+++ b/be/src/olap/lru_cache.cpp
@@ -523,7 +523,9 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name,
size_t total_capacity,
_shards(nullptr),
_last_id(1),
_total_capacity(total_capacity) {
- _mem_tracker =
std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::GLOBAL, name);
+ _mem_tracker = std::make_unique<MemTrackerLimiter>(
+ MemTrackerLimiter::Type::GLOBAL,
+ fmt::format("{}[{}]", name, lru_cache_type_string(type)));
CHECK(num_shards > 0) << "num_shards cannot be 0";
CHECK_EQ((num_shards & (num_shards - 1)), 0)
<< "num_shards should be power of two, but got " << num_shards;
@@ -665,9 +667,4 @@ void ShardedLRUCache::update_cache_metrics() const {
total_lookup_count == 0 ? 0 : ((double)total_hit_count /
total_lookup_count));
}
-Cache* new_lru_cache(const std::string& name, size_t capacity, LRUCacheType
type,
- uint32_t num_shards) {
- return new ShardedLRUCache(name, capacity, type, num_shards);
-}
-
} // namespace doris
diff --git a/be/src/olap/lru_cache.h b/be/src/olap/lru_cache.h
index b07f3c31aa1..8608a85bf2d 100644
--- a/be/src/olap/lru_cache.h
+++ b/be/src/olap/lru_cache.h
@@ -58,11 +58,6 @@ enum LRUCacheType {
NUMBER // The capacity of cache is based on the number of cache entry.
};
-// Create a new cache with a specified name and capacity.
-// This implementation of Cache uses a least-recently-used eviction policy.
-extern Cache* new_lru_cache(const std::string& name, size_t capacity,
- LRUCacheType type = LRUCacheType::SIZE, uint32_t
num_shards = 16);
-
class CacheKey {
public:
CacheKey() : _data(nullptr), _size(0) {}
@@ -394,8 +389,9 @@ private:
class ShardedLRUCache : public Cache {
public:
- explicit ShardedLRUCache(const std::string& name, size_t total_capacity,
LRUCacheType type,
- uint32_t num_shards, uint32_t
element_count_capacity = 0);
+ explicit ShardedLRUCache(const std::string& name, size_t total_capacity,
+ LRUCacheType type = LRUCacheType::SIZE, uint32_t
num_shards = 16,
+ uint32_t element_count_capacity = 0);
explicit ShardedLRUCache(const std::string& name, size_t total_capacity,
LRUCacheType type,
uint32_t num_shards,
CacheValueTimeExtractor
cache_value_time_extractor,
@@ -421,6 +417,17 @@ public:
private:
void update_cache_metrics() const;
+ static std::string lru_cache_type_string(LRUCacheType type) {
+ switch (type) {
+ case LRUCacheType::SIZE:
+ return "size";
+ case LRUCacheType::NUMBER:
+ return "number";
+ default:
+ LOG(FATAL) << "not match type of lru cache:" <<
static_cast<int>(type);
+ }
+ }
+
private:
static uint32_t _hash_slice(const CacheKey& s);
uint32_t _shard(uint32_t hash) {
diff --git a/be/src/olap/schema_cache.h b/be/src/olap/schema_cache.h
index dbce5336d8a..b7f640551d4 100644
--- a/be/src/olap/schema_cache.h
+++ b/be/src/olap/schema_cache.h
@@ -100,8 +100,8 @@ public:
CacheValue* cache_value = (CacheValue*)value;
delete cache_value;
};
- auto lru_handle = _cache->insert(key, value, sizeof(CacheValue),
deleter,
- CachePriority::NORMAL,
schema->mem_size());
+ auto lru_handle =
+ _cache->insert(key, value, 1, deleter, CachePriority::NORMAL,
schema->mem_size());
_cache->release(lru_handle);
}
diff --git a/be/src/olap/segment_loader.cpp b/be/src/olap/segment_loader.cpp
index 8d759cce8ee..02539a3f169 100644
--- a/be/src/olap/segment_loader.cpp
+++ b/be/src/olap/segment_loader.cpp
@@ -45,9 +45,8 @@ void SegmentCache::insert(const SegmentCache::CacheKey& key,
SegmentCache::Cache
delete cache_value;
};
- auto lru_handle =
- _cache->insert(key.encode(), &value,
sizeof(SegmentCache::CacheValue), deleter,
- CachePriority::NORMAL,
value.segment->meta_mem_usage());
+ auto lru_handle = _cache->insert(key.encode(), &value, 1, deleter,
CachePriority::NORMAL,
+ value.segment->meta_mem_usage());
handle->push_segment(_cache.get(), lru_handle);
}
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index 1ed6f74eb88..6ed2f98a327 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -116,8 +116,8 @@ TxnManager::TxnManager(int32_t txn_map_shard_size, int32_t
txn_shard_size)
_txn_tablet_delta_writer_map = new
txn_tablet_delta_writer_map_t[_txn_map_shard_size];
_txn_tablet_delta_writer_map_locks = new
std::shared_mutex[_txn_map_shard_size];
// For debugging
- _tablet_version_cache =
- new ShardedLRUCache("TabletVersionCache", 100000,
LRUCacheType::NUMBER, 32);
+ _tablet_version_cache = std::unique_ptr<Cache>(
+ new ShardedLRUCache("TabletVersionCache", 100000,
LRUCacheType::NUMBER, 32));
}
// prepare txn should always be allowed because ingest task will be retried
@@ -914,8 +914,8 @@ void TxnManager::update_tablet_version_txn(int64_t
tablet_id, int64_t version, i
delete cache_value;
};
- auto handle = _tablet_version_cache->insert(cache_key, value,
sizeof(txn_id), deleter,
- CachePriority::NORMAL,
sizeof(txn_id));
+ auto handle = _tablet_version_cache->insert(cache_key, value, 1, deleter,
CachePriority::NORMAL,
+ sizeof(txn_id));
_tablet_version_cache->release(handle);
}
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index 73ef0cf6e81..74ad589cba0 100644
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -85,7 +85,6 @@ public:
delete[] _txn_mutex;
delete[] _txn_tablet_delta_writer_map;
delete[] _txn_tablet_delta_writer_map_locks;
- delete _tablet_version_cache;
}
// add a txn to manager
@@ -239,7 +238,7 @@ private:
std::shared_mutex* _txn_mutex = nullptr;
txn_tablet_delta_writer_map_t* _txn_tablet_delta_writer_map = nullptr;
- ShardedLRUCache* _tablet_version_cache = nullptr;
+ std::unique_ptr<Cache> _tablet_version_cache;
std::shared_mutex* _txn_tablet_delta_writer_map_locks = nullptr;
DISALLOW_COPY_AND_ASSIGN(TxnManager);
}; // TxnManager
diff --git a/be/src/runtime/load_channel_mgr.cpp
b/be/src/runtime/load_channel_mgr.cpp
index 88f007e1790..82001b74119 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -72,10 +72,6 @@ LoadChannelMgr::LoadChannelMgr() :
_stop_background_threads_latch(1) {
});
}
-LoadChannelMgr::~LoadChannelMgr() {
- delete _last_success_channel;
-}
-
void LoadChannelMgr::stop() {
DEREGISTER_HOOK_METRIC(load_channel_count);
DEREGISTER_HOOK_METRIC(load_channel_mem_consumption);
@@ -86,7 +82,8 @@ void LoadChannelMgr::stop() {
}
Status LoadChannelMgr::init(int64_t process_mem_limit) {
- _last_success_channel = new_lru_cache("LastestSuccessChannelCache", 1024);
+ _last_success_channel =
+ std::unique_ptr<Cache>(new
ShardedLRUCache("LastestSuccessChannelCache", 1024));
RETURN_IF_ERROR(_start_bg_worker());
return Status::OK();
}
diff --git a/be/src/runtime/load_channel_mgr.h
b/be/src/runtime/load_channel_mgr.h
index d34cb69a9e6..0aeec52f245 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -49,7 +49,6 @@ class Thread;
class LoadChannelMgr {
public:
LoadChannelMgr();
- ~LoadChannelMgr();
Status init(int64_t process_mem_limit);
@@ -77,7 +76,7 @@ protected:
std::mutex _lock;
// load id -> load channel
std::unordered_map<UniqueId, std::shared_ptr<LoadChannel>> _load_channels;
- Cache* _last_success_channel = nullptr;
+ std::unique_ptr<Cache> _last_success_channel;
MemTableMemoryLimiter* _memtable_memory_limiter = nullptr;
diff --git a/be/src/runtime/memory/lru_cache_policy.h
b/be/src/runtime/memory/lru_cache_policy.h
index e7b9680eb60..bd301cd47b5 100644
--- a/be/src/runtime/memory/lru_cache_policy.h
+++ b/be/src/runtime/memory/lru_cache_policy.h
@@ -41,9 +41,9 @@ public:
: CachePolicy(type, stale_sweep_time_s) {
_cache = num_shards == -1
? std::unique_ptr<Cache>(
- new_lru_cache(type_string(type), capacity,
lru_cache_type))
- :
std::unique_ptr<Cache>(new_lru_cache(type_string(type), capacity,
-
lru_cache_type, num_shards));
+ new ShardedLRUCache(type_string(type),
capacity, lru_cache_type))
+ : std::unique_ptr<Cache>(new
ShardedLRUCache(type_string(type), capacity,
+
lru_cache_type, num_shards));
}
~LRUCachePolicy() override = default;
diff --git a/be/src/service/point_query_executor.cpp
b/be/src/service/point_query_executor.cpp
index 4974bcc40b4..6a0000a3c4a 100644
--- a/be/src/service/point_query_executor.cpp
+++ b/be/src/service/point_query_executor.cpp
@@ -114,7 +114,7 @@ LookupConnectionCache*
LookupConnectionCache::create_global_instance(size_t capa
RowCache::RowCache(int64_t capacity, int num_shards) {
// Create Row Cache
_cache = std::unique_ptr<Cache>(
- new_lru_cache("RowCache", capacity, LRUCacheType::SIZE,
num_shards));
+ new ShardedLRUCache("RowCache", capacity, LRUCacheType::SIZE,
num_shards));
}
// Create global instance of this class
diff --git a/be/src/util/obj_lru_cache.cpp b/be/src/util/obj_lru_cache.cpp
index 30720887457..b1f2b802583 100644
--- a/be/src/util/obj_lru_cache.cpp
+++ b/be/src/util/obj_lru_cache.cpp
@@ -23,7 +23,7 @@ ObjLRUCache::ObjLRUCache(int64_t capacity, uint32_t
num_shards) {
_enabled = (capacity > 0);
if (_enabled) {
_cache = std::unique_ptr<Cache>(
- new_lru_cache("ObjLRUCache", capacity, LRUCacheType::NUMBER,
num_shards));
+ new ShardedLRUCache("ObjLRUCache", capacity,
LRUCacheType::NUMBER, num_shards));
}
}
diff --git a/be/src/util/obj_lru_cache.h b/be/src/util/obj_lru_cache.h
index db6e937e180..d280567650c 100644
--- a/be/src/util/obj_lru_cache.h
+++ b/be/src/util/obj_lru_cache.h
@@ -85,8 +85,8 @@ public:
void (*deleter)(const CacheKey& key, void* value)) {
if (_enabled) {
const std::string& encoded_key = key.key;
- auto handle = _cache->insert(encoded_key, (void*)value, sizeof(T),
deleter,
- CachePriority::NORMAL, 1);
+ auto handle = _cache->insert(encoded_key, (void*)value, 1, deleter,
+ CachePriority::NORMAL, sizeof(T));
*cache_handle = CacheHandle {_cache.get(), handle};
} else {
cache_handle = nullptr;
diff --git a/be/test/olap/lru_cache_test.cpp b/be/test/olap/lru_cache_test.cpp
index f07482f0083..4f9c661e43d 100644
--- a/be/test/olap/lru_cache_test.cpp
+++ b/be/test/olap/lru_cache_test.cpp
@@ -80,7 +80,7 @@ public:
std::vector<int> _deleted_values;
Cache* _cache;
- CacheTest() : _cache(new_lru_cache("test", kCacheSize)) { _s_current =
this; }
+ CacheTest() : _cache(new ShardedLRUCache("test", kCacheSize)) { _s_current
= this; }
~CacheTest() { delete _cache; }
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]