This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f7aa74e62c6 [fix](filecache) reset_range dose not update shadow queue
causing large cache size (#59314)
f7aa74e62c6 is described below
commit f7aa74e62c634ebc4779707462253ac2c5dec75d
Author: zhengyu <[email protected]>
AuthorDate: Wed Jan 7 08:09:48 2026 +0800
[fix](filecache) reset_range dose not update shadow queue causing large
cache size (#59314)
shadown queue is copying the actual LRU queue and provide lockless
acess. but the copying loses updating size when actual LRU queue is
reseting range (when load data, we first allocate 1MB block for the data
and reset the size to the real size when finalizing).
This commit does the following to fix this problem:
1. update the corresponding shadow queue element when resetting
2. calibrate size during initial loading into memory process
Signed-off-by: zhengyu <[email protected]>
---
be/src/io/cache/block_file_cache.cpp | 26 ++++++--
be/src/io/cache/file_cache_common.h | 2 +
be/src/io/cache/fs_file_cache_storage.cpp | 41 ++++++++++---
be/src/io/cache/fs_file_cache_storage.h | 4 ++
be/src/io/cache/lru_queue_recorder.cpp | 9 +++
be/src/io/cache/lru_queue_recorder.h | 3 +-
be/src/util/runtime_profile.h | 8 +--
.../io/cache/block_file_cache_test_meta_store.cpp | 69 ++++++++++++++++++++++
be/test/io/cache/lru_queue_test.cpp | 11 ++++
9 files changed, 156 insertions(+), 17 deletions(-)
diff --git a/be/src/io/cache/block_file_cache.cpp
b/be/src/io/cache/block_file_cache.cpp
index 4ee923e080e..135e87baa7b 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -858,6 +858,17 @@ FileBlockCell* BlockFileCache::add_cell(const
UInt128Wrapper& hash, const CacheC
return nullptr; /// Empty files are not cached.
}
+ VLOG_DEBUG << "Adding file block to cache. size=" << size << " hash=" <<
hash.to_string()
+ << " offset=" << offset << " cache_type=" <<
cache_type_to_string(context.cache_type)
+ << " expiration_time=" << context.expiration_time
+ << " tablet_id=" << context.tablet_id;
+
+ if (size > 1024 * 1024 * 1024) {
+ LOG(WARNING) << "File block size is too large for a block. size=" <<
size
+ << " hash=" << hash.to_string() << " offset=" << offset
+ << " stack:" << get_stack_trace();
+ }
+
auto& offsets = _files[hash];
auto itr = offsets.find(offset);
if (itr != offsets.end()) {
@@ -1211,10 +1222,10 @@ void BlockFileCache::reset_range(const UInt128Wrapper&
hash, size_t offset, size
if (cell->queue_iterator) {
auto& queue = get_queue(cell->file_block->cache_type());
DCHECK(queue.contains(hash, offset, cache_lock));
- auto iter = queue.get(hash, offset, cache_lock);
- iter->size = new_size;
- queue.cache_size -= old_size;
- queue.cache_size += new_size;
+ queue.resize(*cell->queue_iterator, new_size, cache_lock);
+ _lru_recorder->record_queue_event(cell->file_block->cache_type(),
CacheLRULogType::RESIZE,
+ cell->file_block->get_hash_value(),
+ cell->file_block->offset(),
new_size);
}
_cur_cache_size -= old_size;
_cur_cache_size += new_size;
@@ -1523,6 +1534,13 @@ void LRUQueue::remove_all(std::lock_guard<std::mutex>&
/* cache_lock */) {
void LRUQueue::move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& /*
cache_lock */) {
queue.splice(queue.end(), queue, queue_it);
}
+
+void LRUQueue::resize(Iterator queue_it, size_t new_size,
+ std::lock_guard<std::mutex>& /* cache_lock */) {
+ cache_size -= queue_it->size;
+ queue_it->size = new_size;
+ cache_size += new_size;
+}
bool LRUQueue::contains(const UInt128Wrapper& hash, size_t offset,
std::lock_guard<std::mutex>& /* cache_lock */) const {
return map.find(std::make_pair(hash, offset)) != map.end();
diff --git a/be/src/io/cache/file_cache_common.h
b/be/src/io/cache/file_cache_common.h
index 69427ec3492..96cde89f6c3 100644
--- a/be/src/io/cache/file_cache_common.h
+++ b/be/src/io/cache/file_cache_common.h
@@ -233,6 +233,8 @@ public:
void move_to_end(Iterator queue_it, std::lock_guard<std::mutex>&
cache_lock);
+ void resize(Iterator queue_it, size_t new_size,
std::lock_guard<std::mutex>& cache_lock);
+
std::string to_string(std::lock_guard<std::mutex>& cache_lock) const;
bool contains(const UInt128Wrapper& hash, size_t offset,
diff --git a/be/src/io/cache/fs_file_cache_storage.cpp
b/be/src/io/cache/fs_file_cache_storage.cpp
index 0860265e181..53e4f0c4dd9 100644
--- a/be/src/io/cache/fs_file_cache_storage.cpp
+++ b/be/src/io/cache/fs_file_cache_storage.cpp
@@ -583,6 +583,31 @@ Status
FSFileCacheStorage::parse_filename_suffix_to_cache_type(
return Status::OK();
}
+bool FSFileCacheStorage::handle_already_loaded_block(
+ BlockFileCache* mgr, const UInt128Wrapper& hash, size_t offset, size_t
new_size,
+ int64_t tablet_id, std::lock_guard<std::mutex>& cache_lock) const {
+ auto file_it = mgr->_files.find(hash);
+ if (file_it == mgr->_files.end()) {
+ return false;
+ }
+
+ auto cell_it = file_it->second.find(offset);
+ if (cell_it == file_it->second.end()) {
+ return false;
+ }
+
+ auto block = cell_it->second.file_block;
+ if (tablet_id != 0 && block->tablet_id() == 0) {
+ block->set_tablet_id(tablet_id);
+ }
+
+ size_t old_size = block->range().size();
+ if (old_size != new_size) {
+ mgr->reset_range(hash, offset, old_size, new_size, cache_lock);
+ }
+ return true;
+}
+
void FSFileCacheStorage::load_cache_info_into_memory_from_fs(BlockFileCache*
_mgr) const {
int scan_length = 10000;
std::vector<BatchLoadArgs> batch_load_buffer;
@@ -592,8 +617,8 @@ void
FSFileCacheStorage::load_cache_info_into_memory_from_fs(BlockFileCache* _mg
auto f = [&](const BatchLoadArgs& args) {
// in async load mode, a cell may be added twice.
- if (_mgr->_files.contains(args.hash) &&
_mgr->_files[args.hash].contains(args.offset)) {
- // TODO(zhengyu): update type&expiration if need
+ if (handle_already_loaded_block(_mgr, args.hash, args.offset,
args.size,
+ args.ctx.tablet_id, cache_lock)) {
return;
}
// if the file is tmp, it means it is the old file and it should
be removed
@@ -773,11 +798,8 @@ void
FSFileCacheStorage::load_cache_info_into_memory_from_db(BlockFileCache* _mg
auto f = [&](const BatchLoadArgs& args) {
// in async load mode, a cell may be added twice.
- if (_mgr->_files.contains(args.hash) &&
_mgr->_files[args.hash].contains(args.offset)) {
- auto block = _mgr->_files[args.hash][args.offset].file_block;
- if (block->tablet_id() == 0) {
- block->set_tablet_id(args.ctx.tablet_id);
- }
+ if (handle_already_loaded_block(_mgr, args.hash, args.offset,
args.size,
+ args.ctx.tablet_id, cache_lock)) {
return;
}
_mgr->add_cell(args.hash, args.ctx, args.offset, args.size,
@@ -920,7 +942,10 @@ void
FSFileCacheStorage::load_blocks_directly_unlocked(BlockFileCache* mgr, cons
context_original.cache_type = static_cast<FileCacheType>(block_meta->type);
context_original.tablet_id = key.meta.tablet_id;
- if (!mgr->_files.contains(key.hash) ||
!mgr->_files[key.hash].contains(key.offset)) {
+ if (handle_already_loaded_block(mgr, key.hash, key.offset,
block_meta->size, key.meta.tablet_id,
+ cache_lock)) {
+ return;
+ } else {
mgr->add_cell(key.hash, context_original, key.offset, block_meta->size,
FileBlock::State::DOWNLOADED, cache_lock);
}
diff --git a/be/src/io/cache/fs_file_cache_storage.h
b/be/src/io/cache/fs_file_cache_storage.h
index ea5695438c6..d486552d2b6 100644
--- a/be/src/io/cache/fs_file_cache_storage.h
+++ b/be/src/io/cache/fs_file_cache_storage.h
@@ -111,6 +111,10 @@ private:
void load_cache_info_into_memory(BlockFileCache* _mgr) const;
+ bool handle_already_loaded_block(BlockFileCache* mgr, const
UInt128Wrapper& hash, size_t offset,
+ size_t new_size, int64_t tablet_id,
+ std::lock_guard<std::mutex>& cache_lock)
const;
+
private:
// Helper function to count files in cache directory using statfs
size_t estimate_file_count_from_statfs() const;
diff --git a/be/src/io/cache/lru_queue_recorder.cpp
b/be/src/io/cache/lru_queue_recorder.cpp
index 8308a2a73ad..9907e58cb2a 100644
--- a/be/src/io/cache/lru_queue_recorder.cpp
+++ b/be/src/io/cache/lru_queue_recorder.cpp
@@ -62,6 +62,15 @@ void LRUQueueRecorder::replay_queue_event(FileCacheType
type) {
}
break;
}
+ case CacheLRULogType::RESIZE: {
+ auto it = shadow_queue.get(log->hash, log->offset,
lru_log_lock);
+ if (it != std::list<LRUQueue::FileKeyAndOffset>::iterator()) {
+ shadow_queue.resize(it, log->size, lru_log_lock);
+ } else {
+ LOG(WARNING) << "RESIZE failed, doesn't exist in shadow
queue";
+ }
+ break;
+ }
default:
LOG(WARNING) << "Unknown CacheLRULogType: " <<
static_cast<int>(log->type);
break;
diff --git a/be/src/io/cache/lru_queue_recorder.h
b/be/src/io/cache/lru_queue_recorder.h
index 1f6d69493cf..5bd68b70d55 100644
--- a/be/src/io/cache/lru_queue_recorder.h
+++ b/be/src/io/cache/lru_queue_recorder.h
@@ -31,7 +31,8 @@ enum class CacheLRULogType {
ADD = 0, // all of the integer types
REMOVE = 1,
MOVETOBACK = 2,
- INVALID = 3,
+ RESIZE = 3,
+ INVALID = 4,
};
struct CacheLRULog {
diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h
index a9ccb0910b6..79c0c1538c5 100644
--- a/be/src/util/runtime_profile.h
+++ b/be/src/util/runtime_profile.h
@@ -395,7 +395,7 @@ public:
int64_t level = 2, int64_t condition = 0, int64_t
value = 0)
: Counter(type, value, level),
_condition(condition),
- _value(value),
+ _stored_value(value),
_condition_func(condition_func) {}
Counter* clone() const override {
@@ -405,13 +405,13 @@ public:
int64_t value() const override {
std::lock_guard<std::mutex> l(_mutex);
- return _value;
+ return _stored_value;
}
void conditional_update(int64_t c, int64_t v) {
std::lock_guard<std::mutex> l(_mutex);
if (_condition_func(_condition, c)) {
- _value = v;
+ _stored_value = v;
_condition = c;
}
}
@@ -419,7 +419,7 @@ public:
private:
mutable std::mutex _mutex;
int64_t _condition;
- int64_t _value;
+ int64_t _stored_value;
ConditionCounterFunction _condition_func;
};
diff --git a/be/test/io/cache/block_file_cache_test_meta_store.cpp
b/be/test/io/cache/block_file_cache_test_meta_store.cpp
index 8234d03b527..585c359dd73 100644
--- a/be/test/io/cache/block_file_cache_test_meta_store.cpp
+++ b/be/test/io/cache/block_file_cache_test_meta_store.cpp
@@ -511,6 +511,75 @@ TEST_F(BlockFileCacheTest,
clear_retains_meta_directory_and_clears_meta_entries)
}
}
+TEST_F(BlockFileCacheTest,
handle_already_loaded_block_updates_size_and_tablet) {
+ config::enable_evict_file_cache_in_advance = false;
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+
+ io::FileCacheSettings settings;
+ settings.ttl_queue_size = 5000000;
+ settings.ttl_queue_elements = 50000;
+ settings.query_queue_size = 5000000;
+ settings.query_queue_elements = 50000;
+ settings.index_queue_size = 5000000;
+ settings.index_queue_elements = 50000;
+ settings.disposable_queue_size = 5000000;
+ settings.disposable_queue_elements = 50000;
+ settings.capacity = 20000000;
+ settings.max_file_block_size = 100000;
+ settings.max_query_cache_size = 30;
+
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ for (int i = 0; i < 100; ++i) {
+ if (cache.get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+ ASSERT_TRUE(cache.get_async_open_success());
+
+ io::CacheContext context;
+ ReadStatistics rstats;
+ context.stats = &rstats;
+ context.cache_type = io::FileCacheType::NORMAL;
+ context.query_id.hi = 11;
+ context.query_id.lo = 12;
+ context.tablet_id = 0;
+ auto key = io::BlockFileCache::hash("sync_cached_block_meta_key");
+
+ constexpr size_t kOriginalSize = 100000;
+ auto holder = cache.get_or_set(key, 0, kOriginalSize, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0], kOriginalSize);
+ blocks.clear();
+
+ auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(cache._storage.get());
+ ASSERT_NE(fs_storage, nullptr) << "Expected FSFileCacheStorage but got
different storage type";
+
+ constexpr size_t kNewSize = 2 * kOriginalSize;
+ constexpr int64_t kTabletId = 4242;
+ bool handled = false;
+ {
+ SCOPED_CACHE_LOCK(cache._mutex, (&cache));
+ handled = fs_storage->handle_already_loaded_block(&cache, key, 0,
kNewSize, kTabletId,
+ cache_lock);
+ }
+
+ ASSERT_TRUE(handled);
+ auto& cell = cache._files[key][0];
+ EXPECT_EQ(cell.file_block->tablet_id(), kTabletId);
+ EXPECT_EQ(cache._cur_cache_size, kNewSize);
+ EXPECT_EQ(cache._normal_queue.get_capacity_unsafe(), kNewSize);
+
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+}
TEST_F(BlockFileCacheTest, estimate_file_count_skips_removed_directory) {
std::string test_dir = cache_base_path +
"/estimate_file_count_removed_dir";
if (fs::exists(test_dir)) {
diff --git a/be/test/io/cache/lru_queue_test.cpp
b/be/test/io/cache/lru_queue_test.cpp
index 4a01fb27e3d..2a9cdc3a6bc 100644
--- a/be/test/io/cache/lru_queue_test.cpp
+++ b/be/test/io/cache/lru_queue_test.cpp
@@ -115,3 +115,14 @@ TEST_F(LRUQueueTest, SameElementsDifferentOrder) {
EXPECT_EQ(queue1->levenshtein_distance_from(*queue2, lock), 2);
}
+
+TEST_F(LRUQueueTest, ResizeUpdatesCacheSize) {
+ std::mutex mutex;
+ std::lock_guard lock(mutex);
+
+ auto iter = queue1->add(UInt128Wrapper(123), 0, 1024, lock);
+ EXPECT_EQ(queue1->get_capacity(lock), 1024);
+
+ queue1->resize(iter, 2048, lock);
+ EXPECT_EQ(queue1->get_capacity(lock), 2048);
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]