This is an automated email from the ASF dual-hosted git repository.
freemandealer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 54a0c9f46e2 [improvement](filecache) limit file cache LRU replay
queues (#64381)
54a0c9f46e2 is described below
commit 54a0c9f46e2305bafa3f1737ed0866e92322f9b7
Author: zhengyu <[email protected]>
AuthorDate: Thu Jun 11 09:18:32 2026 +0800
[improvement](filecache) limit file cache LRU replay queues (#64381)
Problem Summary: File cache LRU log replay needs tighter default replay
latency, bounded in-memory queues, and observability for pending block
LRU updates and LRU log replay. This change lowers the replay interval
default to 1 ms, adds hard caps for the pending block update queue and
per-type LRU log queues, preserves existing LRU log backlog when tail
recording is disabled, and exposes queue length plus monotonic
produce/consume/idle counters. QPS bvars are not added because
Prometheus can derive rates from the counters. Tests that depended on
background replay timing now use a deterministic single-replay helper
with a high background interval.
---
be/src/common/config.cpp | 4 +-
be/src/common/config.h | 2 +
be/src/io/cache/block_file_cache.cpp | 111 +++++++++++++++++----
be/src/io/cache/block_file_cache.h | 11 +-
be/src/io/cache/lru_queue_recorder.cpp | 68 ++++++++++++-
be/src/io/cache/lru_queue_recorder.h | 11 +-
.../io/cache/block_file_cache_test_lru_dump.cpp | 73 +++++++++++++-
.../io/cache/block_file_cache_test_meta_store.cpp | 28 +++++-
be/test/io/cache/cache_lru_dumper_test.cpp | 51 +++++++++-
9 files changed, 324 insertions(+), 35 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 0ad6ce6d28f..86b7d12e0aa 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1235,6 +1235,7 @@ DEFINE_mInt64(file_cache_remove_block_qps_limit, "1000");
DEFINE_mInt64(file_cache_background_gc_interval_ms, "100");
DEFINE_mInt64(file_cache_background_block_lru_update_interval_ms, "5000");
DEFINE_mInt64(file_cache_background_block_lru_update_qps_limit, "1000");
+DEFINE_mInt64(file_cache_background_block_lru_update_queue_max_size, "500000");
DEFINE_mBool(enable_file_cache_async_touch_on_get_or_set, "false");
DEFINE_mBool(enable_reader_dryrun_when_download_file_cache, "true");
DEFINE_mInt64(file_cache_background_monitor_interval_ms, "5000");
@@ -1245,7 +1246,8 @@ DEFINE_mInt64(file_cache_background_lru_dump_interval_ms,
"60000");
// dump queue only if the queue update specific times through several dump
intervals
DEFINE_mInt64(file_cache_background_lru_dump_update_cnt_threshold, "1000");
DEFINE_mInt64(file_cache_background_lru_dump_tail_record_num, "5000000");
-DEFINE_mInt64(file_cache_background_lru_log_replay_interval_ms, "1000");
+DEFINE_mInt64(file_cache_background_lru_log_queue_max_size, "500000");
+DEFINE_mInt64(file_cache_background_lru_log_replay_interval_ms, "1");
DEFINE_mBool(enable_evaluate_shadow_queue_diff, "false");
DEFINE_mBool(file_cache_enable_only_warm_up_idx, "false");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index c13e5494c6b..0b415ed5d2c 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1280,6 +1280,7 @@ DECLARE_mInt64(file_cache_remove_block_qps_limit);
DECLARE_mInt64(file_cache_background_gc_interval_ms);
DECLARE_mInt64(file_cache_background_block_lru_update_interval_ms);
DECLARE_mInt64(file_cache_background_block_lru_update_qps_limit);
+DECLARE_mInt64(file_cache_background_block_lru_update_queue_max_size);
DECLARE_mBool(enable_file_cache_async_touch_on_get_or_set);
DECLARE_mBool(enable_reader_dryrun_when_download_file_cache);
DECLARE_mInt64(file_cache_background_monitor_interval_ms);
@@ -1293,6 +1294,7 @@
DECLARE_mInt64(file_cache_background_lru_dump_interval_ms);
// dump queue only if the queue update specific times through several dump
intervals
DECLARE_mInt64(file_cache_background_lru_dump_update_cnt_threshold);
DECLARE_mInt64(file_cache_background_lru_dump_tail_record_num);
+DECLARE_mInt64(file_cache_background_lru_log_queue_max_size);
DECLARE_mInt64(file_cache_background_lru_log_replay_interval_ms);
DECLARE_mBool(enable_evaluate_shadow_queue_diff);
diff --git a/be/src/io/cache/block_file_cache.cpp
b/be/src/io/cache/block_file_cache.cpp
index dfbad8a63ed..5c4a275ac3d 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -42,6 +42,7 @@
#include <ranges>
#include "common/cast_set.h"
+#include "common/check.h"
#include "common/config.h"
#include "common/logging.h"
#include "core/uint128.h"
@@ -59,24 +60,53 @@
#include "util/time.h"
namespace doris::io {
+namespace {
+
+constexpr std::array<FileCacheType, 4> LRU_LOG_REPLAY_TYPES = {
+ FileCacheType::TTL, FileCacheType::INDEX, FileCacheType::NORMAL,
FileCacheType::DISPOSABLE};
+
+size_t file_cache_type_index(FileCacheType type) {
+ return static_cast<size_t>(type);
+}
+
+} // namespace
+
// Insert a block pointer into one shard while swallowing allocation failures.
-bool NeedUpdateLRUBlocks::insert(FileBlockSPtr block) {
- if (!block) {
+bool NeedUpdateLRUBlocks::insert(FileBlockSPtr block, size_t max_queue_size) {
+ if (!block || max_queue_size == 0) {
return false;
}
+ bool reserved = false;
try {
auto* raw_ptr = block.get();
auto idx = shard_index(raw_ptr);
auto& shard = _shards[idx];
std::lock_guard lock(shard.mutex);
- auto [_, inserted] = shard.entries.emplace(raw_ptr, std::move(block));
- if (inserted) {
- _size.fetch_add(1, std::memory_order_relaxed);
+ if (shard.entries.contains(raw_ptr)) {
+ return false;
}
- return inserted;
+ size_t cur_size = _size.load(std::memory_order_relaxed);
+ while (cur_size < max_queue_size) {
+ if (_size.compare_exchange_weak(cur_size, cur_size + 1,
std::memory_order_relaxed)) {
+ reserved = true;
+ break;
+ }
+ }
+ if (!reserved) {
+ return false;
+ }
+ auto [_, inserted] = shard.entries.emplace(raw_ptr, std::move(block));
+ DORIS_CHECK(inserted);
+ return true;
} catch (const std::exception& e) {
+ if (reserved) {
+ decrease_size(1);
+ }
LOG(WARNING) << "Failed to enqueue block for LRU update: " << e.what();
} catch (...) {
+ if (reserved) {
+ decrease_size(1);
+ }
LOG(WARNING) << "Failed to enqueue block for LRU update: unknown
error";
}
return false;
@@ -103,7 +133,7 @@ size_t NeedUpdateLRUBlocks::drain(size_t limit,
std::vector<FileBlockSPtr>* outp
++shard_drained;
}
if (shard_drained > 0) {
- _size.fetch_sub(shard_drained, std::memory_order_relaxed);
+ decrease_size(shard_drained);
drained += shard_drained;
}
}
@@ -123,7 +153,7 @@ void NeedUpdateLRUBlocks::clear() {
if (!shard.entries.empty()) {
auto removed = shard.entries.size();
shard.entries.clear();
- _size.fetch_sub(removed, std::memory_order_relaxed);
+ decrease_size(removed);
}
}
} catch (const std::exception& e) {
@@ -133,6 +163,16 @@ void NeedUpdateLRUBlocks::clear() {
}
}
+void NeedUpdateLRUBlocks::decrease_size(size_t delta) {
+ size_t cur_size = _size.load(std::memory_order_relaxed);
+ while (true) {
+ DORIS_CHECK_GE(cur_size, delta);
+ if (_size.compare_exchange_weak(cur_size, cur_size - delta,
std::memory_order_relaxed)) {
+ return;
+ }
+ }
+}
+
size_t NeedUpdateLRUBlocks::shard_index(FileBlock* ptr) const {
DCHECK(ptr != nullptr);
return std::hash<FileBlock*> {}(ptr)&kShardMask;
@@ -348,12 +388,30 @@ BlockFileCache::BlockFileCache(const std::string&
cache_base_path,
_cache_base_path.c_str(), "file_cache_recycle_keys_length");
_need_update_lru_blocks_length_recorder =
std::make_shared<bvar::LatencyRecorder>(
_cache_base_path.c_str(),
"file_cache_need_update_lru_blocks_length");
+ _need_update_lru_blocks_produce_metrics =
std::make_shared<bvar::Adder<size_t>>(
+ _cache_base_path.c_str(),
"file_cache_need_update_lru_blocks_produce");
+ _need_update_lru_blocks_consume_metrics =
std::make_shared<bvar::Adder<size_t>>(
+ _cache_base_path.c_str(),
"file_cache_need_update_lru_blocks_consume");
_update_lru_blocks_latency_us = std::make_shared<bvar::LatencyRecorder>(
_cache_base_path.c_str(),
"file_cache_update_lru_blocks_latency_us");
_ttl_gc_latency_us =
std::make_shared<bvar::LatencyRecorder>(_cache_base_path.c_str(),
"file_cache_ttl_gc_latency_us");
_shadow_queue_levenshtein_distance =
std::make_shared<bvar::LatencyRecorder>(
_cache_base_path.c_str(),
"file_cache_shadow_queue_levenshtein_distance");
+ for (FileCacheType type : {FileCacheType::DISPOSABLE,
FileCacheType::NORMAL,
+ FileCacheType::INDEX, FileCacheType::TTL}) {
+ size_t idx = file_cache_type_index(type);
+ std::string metric_prefix =
+ "file_cache_lru_recorder_" + cache_type_to_string(type) +
"_record_queue";
+ _lru_recorder_queue_length_recorder[idx] =
std::make_shared<bvar::LatencyRecorder>(
+ _cache_base_path.c_str(), metric_prefix + "_length");
+ _lru_recorder_queue_produce_metrics[idx] =
std::make_shared<bvar::Adder<size_t>>(
+ _cache_base_path.c_str(), metric_prefix + "_produce");
+ _lru_recorder_queue_consume_metrics[idx] =
std::make_shared<bvar::Adder<size_t>>(
+ _cache_base_path.c_str(), metric_prefix + "_consume");
+ }
+ _lru_recorder_log_replay_idle_metrics =
std::make_shared<bvar::Adder<size_t>>(
+ _cache_base_path.c_str(),
"file_cache_lru_recorder_log_replay_idle");
_disposable_queue = LRUQueue(cache_settings.disposable_queue_size,
cache_settings.disposable_queue_elements, 60
* 60);
@@ -648,7 +706,10 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper&
hash, const CacheConte
}
void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) {
- if (_need_update_lru_blocks.insert(std::move(block))) {
+ int64_t queue_limit =
config::file_cache_background_block_lru_update_queue_max_size;
+ size_t max_queue_size = queue_limit <= 0 ? 0 :
static_cast<size_t>(queue_limit);
+ if (_need_update_lru_blocks.insert(std::move(block), max_queue_size)) {
+ *_need_update_lru_blocks_produce_metrics << 1;
*_need_update_lru_blocks_length_recorder <<
_need_update_lru_blocks.size();
}
}
@@ -2161,6 +2222,7 @@ void BlockFileCache::run_background_block_lru_update() {
*_need_update_lru_blocks_length_recorder <<
_need_update_lru_blocks.size();
continue;
}
+ *_need_update_lru_blocks_consume_metrics << drained;
int64_t duration_ns = 0;
{
@@ -2376,19 +2438,28 @@ void BlockFileCache::run_background_lru_log_replay() {
}
}
- _lru_recorder->replay_queue_event(FileCacheType::TTL);
- _lru_recorder->replay_queue_event(FileCacheType::INDEX);
- _lru_recorder->replay_queue_event(FileCacheType::NORMAL);
- _lru_recorder->replay_queue_event(FileCacheType::DISPOSABLE);
+ replay_lru_logs_once();
+ }
+}
+
+size_t BlockFileCache::replay_lru_logs_once() {
+ size_t replayed = 0;
+ for (FileCacheType type : LRU_LOG_REPLAY_TYPES) {
+ replayed += _lru_recorder->replay_queue_event(type);
+ }
- if (config::enable_evaluate_shadow_queue_diff) {
- SCOPED_CACHE_LOCK(_mutex, this);
- _lru_recorder->evaluate_queue_diff(_ttl_queue, "ttl", cache_lock);
- _lru_recorder->evaluate_queue_diff(_index_queue, "index",
cache_lock);
- _lru_recorder->evaluate_queue_diff(_normal_queue, "normal",
cache_lock);
- _lru_recorder->evaluate_queue_diff(_disposable_queue,
"disposable", cache_lock);
- }
+ if (replayed == 0) {
+ *_lru_recorder_log_replay_idle_metrics << 1;
+ }
+
+ if (config::enable_evaluate_shadow_queue_diff) {
+ SCOPED_CACHE_LOCK(_mutex, this);
+ _lru_recorder->evaluate_queue_diff(_ttl_queue, "ttl", cache_lock);
+ _lru_recorder->evaluate_queue_diff(_index_queue, "index", cache_lock);
+ _lru_recorder->evaluate_queue_diff(_normal_queue, "normal",
cache_lock);
+ _lru_recorder->evaluate_queue_diff(_disposable_queue, "disposable",
cache_lock);
}
+ return replayed;
}
void BlockFileCache::dump_lru_queues(bool force) {
diff --git a/be/src/io/cache/block_file_cache.h
b/be/src/io/cache/block_file_cache.h
index 5bdbd09f914..5c7faea9ac4 100644
--- a/be/src/io/cache/block_file_cache.h
+++ b/be/src/io/cache/block_file_cache.h
@@ -25,6 +25,7 @@
#include <atomic>
#include <boost/lockfree/spsc_queue.hpp>
#include <functional>
+#include <limits>
#include <memory>
#include <mutex>
#include <optional>
@@ -91,7 +92,7 @@ public:
// Insert a block into the pending set. Returns true only when the block
// was not already queued. Null inputs are ignored.
- bool insert(FileBlockSPtr block);
+ bool insert(FileBlockSPtr block, size_t max_queue_size =
std::numeric_limits<size_t>::max());
// Drain up to `limit` unique blocks into `output`. The method returns how
// many blocks were actually drained and shrinks the internal size
@@ -114,6 +115,7 @@ private:
};
size_t shard_index(FileBlock* ptr) const;
+ void decrease_size(size_t delta);
std::array<Shard, kShardCount> _shards;
std::atomic<size_t> _size {0};
@@ -469,6 +471,7 @@ private:
void run_background_monitor();
void run_background_gc();
void run_background_lru_log_replay();
+ size_t replay_lru_logs_once();
void run_background_lru_dump();
void restore_lru_queues_from_disk(std::lock_guard<std::mutex>& cache_lock);
void run_background_evict_in_advance();
@@ -616,9 +619,15 @@ private:
std::shared_ptr<bvar::LatencyRecorder> _recycle_keys_length_recorder;
std::shared_ptr<bvar::LatencyRecorder> _update_lru_blocks_latency_us;
std::shared_ptr<bvar::LatencyRecorder>
_need_update_lru_blocks_length_recorder;
+ std::shared_ptr<bvar::Adder<size_t>>
_need_update_lru_blocks_produce_metrics;
+ std::shared_ptr<bvar::Adder<size_t>>
_need_update_lru_blocks_consume_metrics;
std::shared_ptr<bvar::LatencyRecorder> _ttl_gc_latency_us;
std::shared_ptr<bvar::LatencyRecorder> _shadow_queue_levenshtein_distance;
+ std::array<std::shared_ptr<bvar::LatencyRecorder>, 4>
_lru_recorder_queue_length_recorder;
+ std::array<std::shared_ptr<bvar::Adder<size_t>>, 4>
_lru_recorder_queue_produce_metrics;
+ std::array<std::shared_ptr<bvar::Adder<size_t>>, 4>
_lru_recorder_queue_consume_metrics;
+ std::shared_ptr<bvar::Adder<size_t>> _lru_recorder_log_replay_idle_metrics;
// keep _storage last so it will deconstruct first
// otherwise, load_cache_info_into_memory might crash
// coz it will use other members of BlockFileCache
diff --git a/be/src/io/cache/lru_queue_recorder.cpp
b/be/src/io/cache/lru_queue_recorder.cpp
index 9907e58cb2a..e2a3a8fa506 100644
--- a/be/src/io/cache/lru_queue_recorder.cpp
+++ b/be/src/io/cache/lru_queue_recorder.cpp
@@ -17,27 +17,53 @@
#include "io/cache/lru_queue_recorder.h"
+#include "common/check.h"
+#include "common/config.h"
#include "io/cache/block_file_cache.h"
#include "io/cache/file_cache_common.h"
namespace doris::io {
+namespace {
+
+size_t file_cache_type_index(FileCacheType type) {
+ return static_cast<size_t>(type);
+}
+
+} // namespace
+
void LRUQueueRecorder::record_queue_event(FileCacheType type, CacheLRULogType
log_type,
const UInt128Wrapper hash, const
size_t offset,
const size_t size) {
- CacheLRULogQueue& log_queue = get_lru_log_queue(type);
- log_queue.enqueue(std::make_unique<CacheLRULog>(log_type, hash, offset,
size));
+ if (config::file_cache_background_lru_dump_tail_record_num <= 0) {
+ return;
+ }
++(_lru_queue_update_cnt_from_last_dump[type]);
+ auto log = std::make_unique<CacheLRULog>(log_type, hash, offset, size);
+ if (!reserve_lru_log_queue_slot(type)) {
+ return;
+ }
+ CacheLRULogQueue& log_queue = get_lru_log_queue(type);
+ if (!log_queue.enqueue(std::move(log))) {
+ release_lru_log_queue_slot(type);
+ return;
+ }
+ size_t idx = file_cache_type_index(type);
+ *(_mgr->_lru_recorder_queue_produce_metrics[idx]) << 1;
+ *(_mgr->_lru_recorder_queue_length_recorder[idx]) <<
lru_log_queue_size(type);
}
-void LRUQueueRecorder::replay_queue_event(FileCacheType type) {
+size_t LRUQueueRecorder::replay_queue_event(FileCacheType type) {
// we don't need the real cache lock for the shadow queue, but we do need
a lock to prevent read/write contension
CacheLRULogQueue& log_queue = get_lru_log_queue(type);
LRUQueue& shadow_queue = get_shadow_queue(type);
std::lock_guard<std::mutex> lru_log_lock(_mutex_lru_log);
std::unique_ptr<CacheLRULog> log;
+ size_t replayed = 0;
while (log_queue.try_dequeue(log)) {
+ release_lru_log_queue_slot(type);
+ ++replayed;
try {
switch (log->type) {
case CacheLRULogType::ADD: {
@@ -79,6 +105,12 @@ void LRUQueueRecorder::replay_queue_event(FileCacheType
type) {
LOG(WARNING) << "Failed to replay queue event: " << e.what();
}
}
+ size_t idx = file_cache_type_index(type);
+ if (replayed > 0) {
+ *(_mgr->_lru_recorder_queue_consume_metrics[idx]) << replayed;
+ }
+ *(_mgr->_lru_recorder_queue_length_recorder[idx]) <<
lru_log_queue_size(type);
+ return replayed;
}
// we evaluate the diff between two queue by calculate how many operation is
@@ -137,4 +169,34 @@ void
LRUQueueRecorder::reset_lru_queue_update_cnt_from_last_dump(FileCacheType t
_lru_queue_update_cnt_from_last_dump[type] = 0;
}
+size_t LRUQueueRecorder::lru_log_queue_size(FileCacheType type) const {
+ return
_lru_log_queue_size[file_cache_type_index(type)].load(std::memory_order_relaxed);
+}
+
+bool LRUQueueRecorder::reserve_lru_log_queue_slot(FileCacheType type) {
+ int64_t queue_limit = config::file_cache_background_lru_log_queue_max_size;
+ if (queue_limit <= 0) {
+ return false;
+ }
+ auto& queue_size = _lru_log_queue_size[file_cache_type_index(type)];
+ size_t cur_size = queue_size.load(std::memory_order_relaxed);
+ while (cur_size < static_cast<size_t>(queue_limit)) {
+ if (queue_size.compare_exchange_weak(cur_size, cur_size + 1,
std::memory_order_relaxed)) {
+ return true;
+ }
+ }
+ return false;
+}
+
+void LRUQueueRecorder::release_lru_log_queue_slot(FileCacheType type) {
+ auto& queue_size = _lru_log_queue_size[file_cache_type_index(type)];
+ size_t cur_size = queue_size.load(std::memory_order_relaxed);
+ while (true) {
+ DORIS_CHECK_GT(cur_size, 0);
+ if (queue_size.compare_exchange_weak(cur_size, cur_size - 1,
std::memory_order_relaxed)) {
+ return;
+ }
+ }
+}
+
} // end of namespace doris::io
diff --git a/be/src/io/cache/lru_queue_recorder.h
b/be/src/io/cache/lru_queue_recorder.h
index 5bd68b70d55..5ffa777e437 100644
--- a/be/src/io/cache/lru_queue_recorder.h
+++ b/be/src/io/cache/lru_queue_recorder.h
@@ -19,7 +19,11 @@
#include <concurrentqueue.h>
+#include <array>
+#include <atomic>
#include <boost/lockfree/spsc_queue.hpp>
+#include <mutex>
+#include <unordered_map>
#include "io/cache/file_cache_common.h"
@@ -57,11 +61,12 @@ public:
}
void record_queue_event(FileCacheType type, CacheLRULogType log_type,
const UInt128Wrapper hash,
const size_t offset, const size_t size);
- void replay_queue_event(FileCacheType type);
+ size_t replay_queue_event(FileCacheType type);
void evaluate_queue_diff(LRUQueue& base, std::string name,
std::lock_guard<std::mutex>& cache_lock);
size_t get_lru_queue_update_cnt_from_last_dump(FileCacheType type);
void reset_lru_queue_update_cnt_from_last_dump(FileCacheType type);
+ size_t lru_log_queue_size(FileCacheType type) const;
CacheLRULogQueue& get_lru_log_queue(FileCacheType type);
LRUQueue& get_shadow_queue(FileCacheType type);
@@ -81,8 +86,12 @@ private:
CacheLRULogQueue _disposable_lru_log_queue;
std::unordered_map<FileCacheType, size_t>
_lru_queue_update_cnt_from_last_dump;
+ std::array<std::atomic<size_t>, 4> _lru_log_queue_size {};
BlockFileCache* _mgr;
+
+ bool reserve_lru_log_queue_slot(FileCacheType type);
+ void release_lru_log_queue_slot(FileCacheType type);
};
} // namespace doris::io
diff --git a/be/test/io/cache/block_file_cache_test_lru_dump.cpp
b/be/test/io/cache/block_file_cache_test_lru_dump.cpp
index 6d6681f4fcd..3a581d77c76 100644
--- a/be/test/io/cache/block_file_cache_test_lru_dump.cpp
+++ b/be/test/io/cache/block_file_cache_test_lru_dump.cpp
@@ -27,6 +27,11 @@ TEST_F(BlockFileCacheTest,
test_lru_log_record_replay_dump_restore) {
config::file_cache_enter_disk_resource_limit_mode_percent = 99;
config::file_cache_background_lru_dump_interval_ms = 3000;
config::file_cache_background_lru_dump_update_cnt_threshold = 0;
+ const auto old_replay_interval_ms =
config::file_cache_background_lru_log_replay_interval_ms;
+ Defer defer {[old_replay_interval_ms] {
+ config::file_cache_background_lru_log_replay_interval_ms =
old_replay_interval_ms;
+ }};
+ config::file_cache_background_lru_log_replay_interval_ms = 60 * 60 * 1000;
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
@@ -162,8 +167,7 @@ TEST_F(BlockFileCacheTest,
test_lru_log_record_replay_dump_restore) {
ASSERT_EQ(cache._lru_recorder->_disposable_lru_log_queue.size_approx(), 5);
// then check the log replay
- std::this_thread::sleep_for(std::chrono::milliseconds(
- 2 * config::file_cache_background_lru_log_replay_interval_ms));
+ ASSERT_EQ(cache.replay_lru_logs_once(), 20);
ASSERT_EQ(cache._lru_recorder->_shadow_ttl_queue.get_elements_num_unsafe(), 5);
ASSERT_EQ(cache._lru_recorder->_shadow_index_queue.get_elements_num_unsafe(),
5);
ASSERT_EQ(cache._lru_recorder->_shadow_normal_queue.get_elements_num_unsafe(),
5);
@@ -180,8 +184,7 @@ TEST_F(BlockFileCacheTest,
test_lru_log_record_replay_dump_restore) {
ASSERT_EQ(cache._lru_recorder->_normal_lru_log_queue.size_approx(), 0);
ASSERT_EQ(cache._lru_recorder->_disposable_lru_log_queue.size_approx(), 0);
- std::this_thread::sleep_for(std::chrono::milliseconds(
- 2 * config::file_cache_background_lru_log_replay_interval_ms));
+ ASSERT_EQ(cache.replay_lru_logs_once(), 6);
ASSERT_EQ(cache._lru_recorder->_shadow_ttl_queue.get_elements_num_unsafe(), 0);
ASSERT_EQ(cache._lru_recorder->_shadow_index_queue.get_elements_num_unsafe(),
5);
ASSERT_EQ(cache._lru_recorder->_shadow_normal_queue.get_elements_num_unsafe(),
5);
@@ -493,6 +496,68 @@ TEST_F(BlockFileCacheTest,
test_lru_duplicate_queue_entry_restore) {
}
}
+TEST_F(BlockFileCacheTest, need_update_lru_blocks_hard_cap) {
+ std::string cache_base_path = caches_dir /
"cache_need_update_lru_blocks_hard_cap" / "";
+ const auto old_update_interval_ms =
config::file_cache_background_block_lru_update_interval_ms;
+ const auto old_update_queue_max_size =
+ config::file_cache_background_block_lru_update_queue_max_size;
+ Defer defer {[old_update_interval_ms, old_update_queue_max_size] {
+ config::file_cache_background_block_lru_update_interval_ms =
old_update_interval_ms;
+ config::file_cache_background_block_lru_update_queue_max_size =
old_update_queue_max_size;
+ }};
+
+ config::file_cache_background_block_lru_update_interval_ms = 60 * 60 *
1000;
+ config::file_cache_background_block_lru_update_queue_max_size = 2;
+
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+
+ io::FileCacheSettings settings;
+ settings.query_queue_size = 5000000;
+ settings.query_queue_elements = 50000;
+ settings.index_queue_size = 5000000;
+ settings.index_queue_elements = 50000;
+ settings.disposable_queue_size = 5000000;
+ settings.disposable_queue_elements = 50000;
+ settings.ttl_queue_size = 5000000;
+ settings.ttl_queue_elements = 50000;
+ settings.capacity = 20000000;
+ settings.max_file_block_size = 100000;
+ settings.max_query_cache_size = 30;
+
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ wait_until_cache_ready(cache);
+
+ io::CacheContext context;
+ ReadStatistics rstats;
+ context.stats = &rstats;
+ context.cache_type = io::FileCacheType::NORMAL;
+ auto key = io::BlockFileCache::hash("need_update_lru_blocks_hard_cap");
+
+ std::vector<io::FileBlockSPtr> blocks;
+ for (size_t offset = 0; offset < 300000; offset += 100000) {
+ auto holder = cache.get_or_set(key, offset, 100000, context);
+ auto holder_blocks = fromHolder(holder);
+ ASSERT_EQ(holder_blocks.size(), 1);
+ blocks.push_back(holder_blocks[0]);
+ }
+
+ cache.add_need_update_lru_block(blocks[0]);
+ cache.add_need_update_lru_block(blocks[0]);
+ EXPECT_EQ(cache.need_update_lru_blocks_size_unsafe(), 1);
+
+ cache.add_need_update_lru_block(blocks[1]);
+ cache.add_need_update_lru_block(blocks[2]);
+ EXPECT_EQ(cache.need_update_lru_blocks_size_unsafe(), 2);
+
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+}
+
TEST_F(BlockFileCacheTest, cached_remote_file_reader_direct_read_order_check) {
std::string cache_base_path = caches_dir / "cache_direct_read_order_check"
/ "";
config::enable_read_cache_file_directly = true;
diff --git a/be/test/io/cache/block_file_cache_test_meta_store.cpp
b/be/test/io/cache/block_file_cache_test_meta_store.cpp
index eca106ebd7a..e2d2b5a957d 100644
--- a/be/test/io/cache/block_file_cache_test_meta_store.cpp
+++ b/be/test/io/cache/block_file_cache_test_meta_store.cpp
@@ -23,6 +23,8 @@
#pragma clang diagnostic ignored "-Wkeyword-macro"
#endif
+#include "util/defer_op.h"
+
#define private public
#define protected public
#include "io/cache/block_file_cache_test_common.h"
@@ -55,12 +57,30 @@ void verify_meta_key(CacheBlockMetaStore& meta_store,
int64_t tablet_id,
} // namespace
TEST_F(BlockFileCacheTest, version3_add_remove_restart) {
+ const auto old_enable_evict = config::enable_evict_file_cache_in_advance;
+ const auto old_disk_limit_percent =
config::file_cache_enter_disk_resource_limit_mode_percent;
+ const auto old_dump_interval_ms =
config::file_cache_background_lru_dump_interval_ms;
+ const auto old_dump_update_cnt_threshold =
+ config::file_cache_background_lru_dump_update_cnt_threshold;
+ const auto old_dump_tail_record_num =
config::file_cache_background_lru_dump_tail_record_num;
+ const auto old_replay_interval_ms =
config::file_cache_background_lru_log_replay_interval_ms;
+ Defer defer {[old_enable_evict, old_disk_limit_percent,
old_dump_interval_ms,
+ old_dump_update_cnt_threshold, old_dump_tail_record_num,
old_replay_interval_ms] {
+ config::enable_evict_file_cache_in_advance = old_enable_evict;
+ config::file_cache_enter_disk_resource_limit_mode_percent =
old_disk_limit_percent;
+ config::file_cache_background_lru_dump_interval_ms =
old_dump_interval_ms;
+ config::file_cache_background_lru_dump_update_cnt_threshold =
old_dump_update_cnt_threshold;
+ config::file_cache_background_lru_dump_tail_record_num =
old_dump_tail_record_num;
+ config::file_cache_background_lru_log_replay_interval_ms =
old_replay_interval_ms;
+ }};
+
config::enable_evict_file_cache_in_advance = false;
config::file_cache_enter_disk_resource_limit_mode_percent = 99;
config::file_cache_background_lru_dump_interval_ms = 3000;
config::file_cache_background_lru_dump_update_cnt_threshold = 0;
config::file_cache_background_lru_dump_tail_record_num =
2; // only dump last 2, to check dump works with meta store
+ config::file_cache_background_lru_log_replay_interval_ms = 60 * 60 * 1000;
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
@@ -239,8 +259,7 @@ TEST_F(BlockFileCacheTest, version3_add_remove_restart) {
ASSERT_EQ(cache._lru_recorder->_disposable_lru_log_queue.size_approx(), 5);
// then check the log replay
- std::this_thread::sleep_for(std::chrono::milliseconds(
- 2 * config::file_cache_background_lru_log_replay_interval_ms));
+ ASSERT_EQ(cache.replay_lru_logs_once(), 20);
ASSERT_EQ(cache._lru_recorder->_shadow_ttl_queue.get_elements_num_unsafe(), 5);
ASSERT_EQ(cache._lru_recorder->_shadow_index_queue.get_elements_num_unsafe(),
5);
ASSERT_EQ(cache._lru_recorder->_shadow_normal_queue.get_elements_num_unsafe(),
5);
@@ -251,12 +270,13 @@ TEST_F(BlockFileCacheTest, version3_add_remove_restart) {
cache.remove_if_cached(key2); // remove all element from index
queue
}
- std::this_thread::sleep_for(std::chrono::milliseconds(
- 2 * config::file_cache_background_lru_log_replay_interval_ms));
+ ASSERT_EQ(cache.replay_lru_logs_once(), 5);
ASSERT_EQ(cache._lru_recorder->_shadow_ttl_queue.get_elements_num_unsafe(), 5);
ASSERT_EQ(cache._lru_recorder->_shadow_index_queue.get_elements_num_unsafe(),
0);
ASSERT_EQ(cache._lru_recorder->_shadow_normal_queue.get_elements_num_unsafe(),
5);
ASSERT_EQ(cache._lru_recorder->_shadow_disposable_queue.get_elements_num_unsafe(),
5);
+ EXPECT_EQ(cache.replay_lru_logs_once(), 0);
+ EXPECT_EQ(cache._lru_recorder_log_replay_idle_metrics->get_value(), 1);
// check the meta store to see the content
{
diff --git a/be/test/io/cache/cache_lru_dumper_test.cpp
b/be/test/io/cache/cache_lru_dumper_test.cpp
index 76647ba544f..e2fdfb6a7e7 100644
--- a/be/test/io/cache/cache_lru_dumper_test.cpp
+++ b/be/test/io/cache/cache_lru_dumper_test.cpp
@@ -19,11 +19,13 @@
#include <filesystem>
+#include "common/config.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "io/cache/block_file_cache.h"
#include "io/cache/file_block.h"
#include "io/cache/file_cache_common.h"
+#include "util/defer_op.h"
using ::testing::_;
using ::testing::Return;
@@ -158,4 +160,51 @@ TEST_F(CacheLRUDumperTest, test_dump_and_restore_queue) {
}
}
-} // namespace doris::io
\ No newline at end of file
+TEST_F(CacheLRUDumperTest,
test_lru_log_record_disabled_keeps_existing_backlog) {
+ const auto old_tail_record_num =
config::file_cache_background_lru_dump_tail_record_num;
+ const auto old_queue_limit =
config::file_cache_background_lru_log_queue_max_size;
+ Defer defer {[old_tail_record_num, old_queue_limit] {
+ config::file_cache_background_lru_dump_tail_record_num =
old_tail_record_num;
+ config::file_cache_background_lru_log_queue_max_size = old_queue_limit;
+ }};
+
+ config::file_cache_background_lru_dump_tail_record_num = 2;
+ config::file_cache_background_lru_log_queue_max_size = 10;
+
+ UInt128Wrapper hash(123456789ULL);
+ recorder->record_queue_event(FileCacheType::NORMAL, CacheLRULogType::ADD,
hash, 0, 4096);
+ ASSERT_EQ(recorder->lru_log_queue_size(FileCacheType::NORMAL), 1);
+
+ config::file_cache_background_lru_dump_tail_record_num = 0;
+ recorder->record_queue_event(FileCacheType::NORMAL, CacheLRULogType::ADD,
hash, 4096, 4096);
+
+ EXPECT_EQ(recorder->lru_log_queue_size(FileCacheType::NORMAL), 1);
+
EXPECT_EQ(recorder->get_lru_log_queue(FileCacheType::NORMAL).size_approx(), 1);
+ EXPECT_EQ(recorder->replay_queue_event(FileCacheType::NORMAL), 1);
+
EXPECT_EQ(recorder->get_shadow_queue(FileCacheType::NORMAL).get_elements_num_unsafe(),
1);
+}
+
+TEST_F(CacheLRUDumperTest, test_lru_log_record_queue_hard_cap) {
+ const auto old_tail_record_num =
config::file_cache_background_lru_dump_tail_record_num;
+ const auto old_queue_limit =
config::file_cache_background_lru_log_queue_max_size;
+ Defer defer {[old_tail_record_num, old_queue_limit] {
+ config::file_cache_background_lru_dump_tail_record_num =
old_tail_record_num;
+ config::file_cache_background_lru_log_queue_max_size = old_queue_limit;
+ }};
+
+ config::file_cache_background_lru_dump_tail_record_num = 100;
+ config::file_cache_background_lru_log_queue_max_size = 2;
+
+ UInt128Wrapper hash(987654321ULL);
+ recorder->record_queue_event(FileCacheType::INDEX, CacheLRULogType::ADD,
hash, 0, 4096);
+ recorder->record_queue_event(FileCacheType::INDEX, CacheLRULogType::ADD,
hash, 4096, 4096);
+ recorder->record_queue_event(FileCacheType::INDEX, CacheLRULogType::ADD,
hash, 8192, 4096);
+
+ EXPECT_EQ(recorder->lru_log_queue_size(FileCacheType::INDEX), 2);
+ EXPECT_EQ(recorder->get_lru_log_queue(FileCacheType::INDEX).size_approx(),
2);
+ EXPECT_EQ(recorder->replay_queue_event(FileCacheType::INDEX), 2);
+ EXPECT_EQ(recorder->lru_log_queue_size(FileCacheType::INDEX), 0);
+
EXPECT_EQ(recorder->get_shadow_queue(FileCacheType::INDEX).get_elements_num_unsafe(),
2);
+}
+
+} // namespace doris::io
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]