This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 8120c598d35 [enhancement](cloud) refine block file cache evict policy
(#42451) (#43201)
8120c598d35 is described below
commit 8120c598d350368cc726a23ac2060b141818317e
Author: zhengyu <[email protected]>
AuthorDate: Mon Nov 4 20:19:50 2024 +0800
[enhancement](cloud) refine block file cache evict policy (#42451) (#43201)
- Enhance cache performance and reduce cache misses by:
- Prevent starvation of any particular cache.
- Improve disk space utilization.
- Maintain strict and systematic eviction priorities.
- Enhance the observability of caching strategies, allowing cache
behavior to be intuitively self-explanatory through monitoring.
- Improve code comprehensibility: unify the cache framework to avoid
ad-hoc handling of TTL, increasing the modularity of the code.
- Reduce operational complexity by eliminating and standardizing
configuration items.
Signed-off-by: freemandealer <[email protected]>
---
be/src/common/config.cpp | 4 +-
be/src/io/cache/block_file_cache.cpp | 260 +++-
be/src/io/cache/block_file_cache.h | 27 +-
be/src/io/cache/file_cache_common.cpp | 5 +
be/src/io/cache/file_cache_common.h | 16 +-
be/test/io/cache/block_file_cache_test.cpp | 1911 ++++++++++++++++++++++------
6 files changed, 1789 insertions(+), 434 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 27244c23279..bb92cf18b73 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1030,7 +1030,7 @@ DEFINE_Bool(enable_file_cache_query_limit, "false");
DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "90");
DEFINE_mInt32(file_cache_exit_disk_resource_limit_mode_percent, "80");
DEFINE_mBool(enable_read_cache_file_directly, "false");
-DEFINE_mBool(file_cache_enable_evict_from_other_queue_by_size, "false");
+DEFINE_mBool(file_cache_enable_evict_from_other_queue_by_size, "true");
DEFINE_mInt64(file_cache_ttl_valid_check_interval_second, "0"); // zero for
not checking
// If true, evict the ttl cache using LRU when full.
// Otherwise, only expiration can evict ttl and new data won't add to cache
when full.
@@ -1313,7 +1313,7 @@ DEFINE_Int64(num_s3_file_upload_thread_pool_min_thread,
"16");
// The max thread num for S3FileUploadThreadPool
DEFINE_Int64(num_s3_file_upload_thread_pool_max_thread, "64");
// The max ratio for ttl cache's size
-DEFINE_mInt64(max_ttl_cache_ratio, "90");
+DEFINE_mInt64(max_ttl_cache_ratio, "50");
// The maximum jvm heap usage ratio for hdfs write workload
DEFINE_mDouble(max_hdfs_wirter_jni_heap_usage_ratio, "0.5");
// The sleep milliseconds duration when hdfs write exceeds the maximum usage
diff --git a/be/src/io/cache/block_file_cache.cpp
b/be/src/io/cache/block_file_cache.cpp
index cd502d16547..1a840e2dc6f 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -84,6 +84,94 @@ BlockFileCache::BlockFileCache(const std::string&
cache_base_path,
_total_evict_size_metrics = std::make_shared<bvar::Adder<size_t>>(
_cache_base_path.c_str(), "file_cache_total_evict_size");
+
_evict_by_heat_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL]
=
+ std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+
"file_cache_evict_by_heat_disposable_to_normal");
+
_evict_by_heat_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
+ std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+
"file_cache_evict_by_heat_disposable_to_index");
+
_evict_by_heat_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
+ std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+
"file_cache_evict_by_heat_disposable_to_ttl");
+
_evict_by_heat_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE]
=
+ std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+
"file_cache_evict_by_heat_normal_to_disposable");
+ _evict_by_heat_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX]
=
+ std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+
"file_cache_evict_by_heat_normal_to_index");
+ _evict_by_heat_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
+ std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+
"file_cache_evict_by_heat_normal_to_ttl");
+
_evict_by_heat_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
+ std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+
"file_cache_evict_by_heat_index_to_disposable");
+ _evict_by_heat_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL]
=
+ std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+
"file_cache_evict_by_heat_index_to_normal");
+ _evict_by_heat_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
+ std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+
"file_cache_evict_by_heat_index_to_ttl");
+
_evict_by_heat_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
+ std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+
"file_cache_evict_by_heat_ttl_to_disposable");
+ _evict_by_heat_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
+ std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+
"file_cache_evict_by_heat_ttl_to_normal");
+ _evict_by_heat_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
+ std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+
"file_cache_evict_by_heat_ttl_to_index");
+
+ _evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE] =
+ std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+
"file_cache_evict_by_self_lru_disposable");
+ _evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL] =
+ std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+
"file_cache_evict_by_self_lru_normal");
+ _evict_by_self_lru_metrics_matrix[FileCacheType::INDEX] =
std::make_shared<bvar::Adder<size_t>>(
+ _cache_base_path.c_str(), "file_cache_evict_by_self_lru_index");
+ _evict_by_self_lru_metrics_matrix[FileCacheType::TTL] =
std::make_shared<bvar::Adder<size_t>>(
+ _cache_base_path.c_str(), "file_cache_evict_by_self_lru_ttl");
+
+
_evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL]
=
+ std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+
"file_cache_evict_by_size_disposable_to_normal");
+
_evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
+ std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+
"file_cache_evict_by_size_disposable_to_index");
+
_evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
+ std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+
"file_cache_evict_by_size_disposable_to_ttl");
+
_evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE]
=
+ std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+
"file_cache_evict_by_size_normal_to_disposable");
+ _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX]
=
+ std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+
"file_cache_evict_by_size_normal_to_index");
+ _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
+ std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+
"file_cache_evict_by_size_normal_to_ttl");
+
_evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
+ std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+
"file_cache_evict_by_size_index_to_disposable");
+ _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL]
=
+ std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+
"file_cache_evict_by_size_index_to_normal");
+ _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
+ std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+
"file_cache_evict_by_size_index_to_ttl");
+
_evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
+ std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+
"file_cache_evict_by_size_ttl_to_disposable");
+ _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
+ std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+
"file_cache_evict_by_size_ttl_to_normal");
+ _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
+ std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+
"file_cache_evict_by_size_ttl_to_index");
+
+ _evict_by_try_release = std::make_shared<bvar::Adder<size_t>>(
+ _cache_base_path.c_str(), "file_cache_evict_by_try_release");
+
_num_read_blocks =
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
"file_cache_num_read_blocks");
_num_hit_blocks =
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
@@ -107,6 +195,8 @@ BlockFileCache::BlockFileCache(const std::string&
cache_base_path,
"file_cache_hit_ratio_5m", 0.0);
_hit_ratio_1h =
std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
"file_cache_hit_ratio_1h", 0.0);
+ _disk_limit_mode_metrics =
+ std::make_shared<bvar::Status<size_t>>(_cache_base_path.c_str(),
"disk_limit_mode", 0);
_disposable_queue = LRUQueue(cache_settings.disposable_queue_size,
cache_settings.disposable_queue_elements, 60
* 60);
@@ -114,7 +204,7 @@ BlockFileCache::BlockFileCache(const std::string&
cache_base_path,
7 * 24 * 60 * 60);
_normal_queue = LRUQueue(cache_settings.query_queue_size,
cache_settings.query_queue_elements,
24 * 60 * 60);
- _ttl_queue = LRUQueue(std::numeric_limits<int>::max(),
std::numeric_limits<int>::max(),
+ _ttl_queue = LRUQueue(cache_settings.ttl_queue_size,
cache_settings.ttl_queue_elements,
std::numeric_limits<int>::max());
if (cache_settings.storage == "memory") {
@@ -314,14 +404,10 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper&
hash, const CacheConte
if (st.ok()) {
auto& queue = get_queue(origin_type);
queue.remove(cell.queue_iterator.value(), cache_lock);
- if (config::enable_ttl_cache_evict_using_lru) {
- auto& ttl_queue = get_queue(FileCacheType::TTL);
- cell.queue_iterator = ttl_queue.add(
- cell.file_block->get_hash_value(),
cell.file_block->offset(),
- cell.file_block->range().size(), cache_lock);
- } else {
- cell.queue_iterator.reset();
- }
+ auto& ttl_queue = get_queue(FileCacheType::TTL);
+ cell.queue_iterator =
+ ttl_queue.add(cell.file_block->get_hash_value(),
cell.file_block->offset(),
+ cell.file_block->range().size(),
cache_lock);
} else {
LOG_WARNING("Failed to change key meta").error(st);
}
@@ -731,11 +817,10 @@ BlockFileCache::FileBlockCell*
BlockFileCache::add_cell(const UInt128Wrapper& ha
<< " cache_type=" <<
cache_type_to_string(context.cache_type)
<< " error=" << st.msg();
}
- if (cell.file_block->cache_type() != FileCacheType::TTL ||
- config::enable_ttl_cache_evict_using_lru) {
- auto& queue = get_queue(cell.file_block->cache_type());
- cell.queue_iterator = queue.add(hash, offset, size, cache_lock);
- }
+
+ auto& queue = get_queue(cell.file_block->cache_type());
+ cell.queue_iterator = queue.add(hash, offset, size, cache_lock);
+
if (cell.file_block->cache_type() == FileCacheType::TTL) {
if (_key_to_time.find(hash) == _key_to_time.end()) {
_key_to_time[hash] = context.expiration_time;
@@ -749,7 +834,7 @@ BlockFileCache::FileBlockCell*
BlockFileCache::add_cell(const UInt128Wrapper& ha
}
size_t BlockFileCache::try_release() {
- std::lock_guard l(_mutex);
+ std::lock_guard cache_lock(_mutex);
std::vector<FileBlockCell*> trash;
for (auto& [hash, blocks] : _files) {
for (auto& [offset, cell] : blocks) {
@@ -758,11 +843,14 @@ size_t BlockFileCache::try_release() {
}
}
}
+ size_t remove_size = 0;
for (auto& cell : trash) {
FileBlockSPtr file_block = cell->file_block;
std::lock_guard lc(cell->file_block->_mutex);
- remove(file_block, l, lc);
+ remove_size += file_block->range().size();
+ remove(file_block, cache_lock, lc);
}
+ *_evict_by_try_release << remove_size;
LOG(INFO) << "Released " << trash.size() << " blocks in file cache " <<
_cache_base_path;
return trash.size();
}
@@ -841,9 +929,10 @@ void
BlockFileCache::remove_file_blocks_and_clean_time_maps(
void BlockFileCache::find_evict_candidates(LRUQueue& queue, size_t size,
size_t cur_cache_size,
size_t& removed_size,
std::vector<FileBlockCell*>&
to_evict,
- std::lock_guard<std::mutex>&
cache_lock, bool is_ttl) {
+ std::lock_guard<std::mutex>&
cache_lock,
+ size_t& cur_removed_size) {
for (const auto& [entry_key, entry_offset, entry_size] : queue) {
- if (!is_overflow(removed_size, size, cur_cache_size, is_ttl)) {
+ if (!is_overflow(removed_size, size, cur_cache_size)) {
break;
}
auto* cell = get_cell(entry_key, entry_offset, cache_lock);
@@ -861,6 +950,7 @@ void BlockFileCache::find_evict_candidates(LRUQueue& queue,
size_t size, size_t
DCHECK(file_block->_download_state ==
FileBlock::State::DOWNLOADED);
to_evict.push_back(cell);
removed_size += cell_size;
+ cur_removed_size += cell_size;
}
}
}
@@ -886,8 +976,9 @@ bool BlockFileCache::try_reserve_for_ttl_without_lru(size_t
size,
}
std::vector<FileBlockCell*> to_evict;
auto collect_eliminate_fragments = [&](LRUQueue& queue) {
+ size_t cur_removed_size = 0;
find_evict_candidates(queue, size, cur_cache_size, removed_size,
to_evict, cache_lock,
- false);
+ cur_removed_size);
};
if (disposable_queue_size != 0) {
collect_eliminate_fragments(get_queue(FileCacheType::DISPOSABLE));
@@ -914,8 +1005,9 @@ bool BlockFileCache::try_reserve_for_ttl(size_t size,
std::lock_guard<std::mutex
size_t cur_cache_size = _cur_cache_size;
std::vector<FileBlockCell*> to_evict;
+ size_t cur_removed_size = 0;
find_evict_candidates(queue, size, cur_cache_size, removed_size,
to_evict, cache_lock,
- true);
+ cur_removed_size);
remove_file_blocks_and_clean_time_maps(to_evict, cache_lock);
return !is_overflow(removed_size, size, cur_cache_size);
@@ -948,10 +1040,6 @@ bool BlockFileCache::try_reserve(const UInt128Wrapper&
hash, const CacheContext&
size = 5 * size;
}
- if (context.cache_type == FileCacheType::TTL) {
- return try_reserve_for_ttl(size, cache_lock);
- }
-
auto query_context = config::enable_file_cache_query_limit &&
(context.query_id.hi != 0 ||
context.query_id.lo != 0)
? get_query_context(context.query_id,
cache_lock)
@@ -1112,12 +1200,33 @@ void BlockFileCache::remove_if_cached(const
UInt128Wrapper& file_key) {
}
}
-std::vector<FileCacheType> BlockFileCache::get_other_cache_type(FileCacheType
cur_cache_type) {
+std::vector<FileCacheType> BlockFileCache::get_other_cache_type_without_ttl(
+ FileCacheType cur_cache_type) {
switch (cur_cache_type) {
+ case FileCacheType::TTL:
+ return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL,
FileCacheType::INDEX};
case FileCacheType::INDEX:
return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL};
case FileCacheType::NORMAL:
return {FileCacheType::DISPOSABLE, FileCacheType::INDEX};
+ case FileCacheType::DISPOSABLE:
+ return {FileCacheType::NORMAL, FileCacheType::INDEX};
+ default:
+ return {};
+ }
+ return {};
+}
+
+std::vector<FileCacheType> BlockFileCache::get_other_cache_type(FileCacheType
cur_cache_type) {
+ switch (cur_cache_type) {
+ case FileCacheType::TTL:
+ return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL,
FileCacheType::INDEX};
+ case FileCacheType::INDEX:
+ return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL,
FileCacheType::TTL};
+ case FileCacheType::NORMAL:
+ return {FileCacheType::DISPOSABLE, FileCacheType::INDEX,
FileCacheType::TTL};
+ case FileCacheType::DISPOSABLE:
+ return {FileCacheType::NORMAL, FileCacheType::INDEX,
FileCacheType::TTL};
default:
return {};
}
@@ -1143,13 +1252,14 @@ void BlockFileCache::reset_range(const UInt128Wrapper&
hash, size_t offset, size
}
bool BlockFileCache::try_reserve_from_other_queue_by_hot_interval(
- std::vector<FileCacheType> other_cache_types, size_t size, int64_t
cur_time,
- std::lock_guard<std::mutex>& cache_lock) {
+ FileCacheType cur_type, std::vector<FileCacheType> other_cache_types,
size_t size,
+ int64_t cur_time, std::lock_guard<std::mutex>& cache_lock) {
size_t removed_size = 0;
size_t cur_cache_size = _cur_cache_size;
std::vector<FileBlockCell*> to_evict;
for (FileCacheType cache_type : other_cache_types) {
auto& queue = get_queue(cache_type);
+ size_t remove_size_per_type = 0;
for (const auto& [entry_key, entry_offset, entry_size] : queue) {
if (!is_overflow(removed_size, size, cur_cache_size)) {
break;
@@ -1171,39 +1281,48 @@ bool
BlockFileCache::try_reserve_from_other_queue_by_hot_interval(
DCHECK(file_block->_download_state ==
FileBlock::State::DOWNLOADED);
to_evict.push_back(cell);
removed_size += cell_size;
+ remove_size_per_type += cell_size;
}
}
+ *(_evict_by_heat_metrics_matrix[cache_type][cur_type]) <<
remove_size_per_type;
}
remove_file_blocks(to_evict, cache_lock);
return !is_overflow(removed_size, size, cur_cache_size);
}
-bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size, size_t
cur_cache_size,
- bool is_ttl) const {
+bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size,
+ size_t cur_cache_size) const {
bool ret = false;
if (_disk_resource_limit_mode) {
ret = (removed_size < need_size);
} else {
ret = (cur_cache_size + need_size - removed_size > _capacity);
}
- if (is_ttl) {
- size_t ttl_threshold = config::max_ttl_cache_ratio * _capacity / 100;
- return (ret || ((cur_cache_size + need_size - removed_size) >
ttl_threshold));
- }
return ret;
}
bool BlockFileCache::try_reserve_from_other_queue_by_size(
- std::vector<FileCacheType> other_cache_types, size_t size,
+ FileCacheType cur_type, std::vector<FileCacheType> other_cache_types,
size_t size,
std::lock_guard<std::mutex>& cache_lock) {
size_t removed_size = 0;
size_t cur_cache_size = _cur_cache_size;
std::vector<FileBlockCell*> to_evict;
+ // we follow the privilege defined in get_other_cache_types to evict
for (FileCacheType cache_type : other_cache_types) {
auto& queue = get_queue(cache_type);
+
+ // we will not drain each of them to the bottom -- i.e., we only
+ // evict what they have stolen.
+ size_t cur_queue_size = queue.get_capacity(cache_lock);
+ size_t cur_queue_max_size = queue.get_max_size();
+ if (cur_queue_size <= cur_queue_max_size) {
+ continue;
+ }
+ size_t cur_removed_size = 0;
find_evict_candidates(queue, size, cur_cache_size, removed_size,
to_evict, cache_lock,
- false);
+ cur_removed_size);
+ *(_evict_by_size_metrics_matrix[cache_type][cur_type]) <<
cur_removed_size;
}
remove_file_blocks(to_evict, cache_lock);
return !is_overflow(removed_size, size, cur_cache_size);
@@ -1212,16 +1331,15 @@ bool
BlockFileCache::try_reserve_from_other_queue_by_size(
bool BlockFileCache::try_reserve_from_other_queue(FileCacheType
cur_cache_type, size_t size,
int64_t cur_time,
std::lock_guard<std::mutex>&
cache_lock) {
- // disposable queue cannot reserve other queues
- if (cur_cache_type == FileCacheType::DISPOSABLE) {
- return false;
- }
- auto other_cache_types = get_other_cache_type(cur_cache_type);
- bool reserve_success =
try_reserve_from_other_queue_by_hot_interval(other_cache_types, size,
-
cur_time, cache_lock);
+ // currently, TTL cache is not considered as a candidate
+ auto other_cache_types = get_other_cache_type_without_ttl(cur_cache_type);
+ bool reserve_success = try_reserve_from_other_queue_by_hot_interval(
+ cur_cache_type, other_cache_types, size, cur_time, cache_lock);
if (reserve_success ||
!config::file_cache_enable_evict_from_other_queue_by_size) {
return reserve_success;
}
+
+ other_cache_types = get_other_cache_type(cur_cache_type);
auto& cur_queue = get_queue(cur_cache_type);
size_t cur_queue_size = cur_queue.get_capacity(cache_lock);
size_t cur_queue_max_size = cur_queue.get_max_size();
@@ -1229,7 +1347,8 @@ bool
BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type,
if (_cur_cache_size + size > _capacity && cur_queue_size + size >
cur_queue_max_size) {
return false;
}
- return try_reserve_from_other_queue_by_size(other_cache_types, size,
cache_lock);
+ return try_reserve_from_other_queue_by_size(cur_cache_type,
other_cache_types, size,
+ cache_lock);
}
bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,
@@ -1245,9 +1364,11 @@ bool BlockFileCache::try_reserve_for_lru(const
UInt128Wrapper& hash,
size_t cur_cache_size = _cur_cache_size;
std::vector<FileBlockCell*> to_evict;
+ size_t cur_removed_size = 0;
find_evict_candidates(queue, size, cur_cache_size, removed_size,
to_evict, cache_lock,
- false);
+ cur_removed_size);
remove_file_blocks(to_evict, cache_lock);
+ *(_evict_by_self_lru_metrics_matrix[context.cache_type]) <<
cur_removed_size;
if (is_overflow(removed_size, size, cur_cache_size)) {
return false;
@@ -1522,6 +1643,7 @@ std::string BlockFileCache::reset_capacity(size_t
new_capacity) {
ss << " ttl_queue released " << queue_released;
}
_disk_resource_limit_mode = true;
+ _disk_limit_mode_metrics->set_value(1);
_async_clear_file_cache = true;
ss << " total_space_released=" << space_released;
}
@@ -1542,6 +1664,7 @@ void BlockFileCache::check_disk_resource_limit() {
}
if (_capacity > _cur_cache_size) {
_disk_resource_limit_mode = false;
+ _disk_limit_mode_metrics->set_value(0);
}
std::pair<int, int> percent;
int ret = disk_used_percentage(_cache_base_path, &percent);
@@ -1567,10 +1690,12 @@ void BlockFileCache::check_disk_resource_limit() {
if (capacity_percentage >=
config::file_cache_enter_disk_resource_limit_mode_percent ||
inode_is_insufficient(inode_percentage)) {
_disk_resource_limit_mode = true;
+ _disk_limit_mode_metrics->set_value(1);
} else if (_disk_resource_limit_mode &&
(capacity_percentage <
config::file_cache_exit_disk_resource_limit_mode_percent) &&
(inode_percentage <
config::file_cache_exit_disk_resource_limit_mode_percent)) {
_disk_resource_limit_mode = false;
+ _disk_limit_mode_metrics->set_value(0);
}
if (_disk_resource_limit_mode) {
// log per mins
@@ -1685,14 +1810,9 @@ void BlockFileCache::modify_expiration_time(const
UInt128Wrapper& hash,
if (st.ok()) {
auto& queue = get_queue(origin_type);
queue.remove(cell.queue_iterator.value(), cache_lock);
- if (config::enable_ttl_cache_evict_using_lru) {
- auto& ttl_queue = get_queue(FileCacheType::TTL);
- cell.queue_iterator =
- ttl_queue.add(hash, cell.file_block->offset(),
- cell.file_block->range().size(),
cache_lock);
- } else {
- cell.queue_iterator.reset();
- }
+ auto& ttl_queue = get_queue(FileCacheType::TTL);
+ cell.queue_iterator = ttl_queue.add(hash,
cell.file_block->offset(),
+
cell.file_block->range().size(), cache_lock);
}
if (!st.ok()) {
LOG_WARNING("").error(st);
@@ -1850,6 +1970,12 @@ std::map<std::string, double>
BlockFileCache::get_stats() {
stats["index_queue_curr_elements"] =
(double)_cur_index_queue_element_count_metrics->get_value();
+ stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
+ stats["ttl_queue_curr_size"] =
(double)_cur_ttl_cache_lru_queue_cache_size_metrics->get_value();
+ stats["ttl_queue_max_elements"] =
(double)_ttl_queue.get_max_element_size();
+ stats["ttl_queue_curr_elements"] =
+
(double)_cur_ttl_cache_lru_queue_element_count_metrics->get_value();
+
stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
stats["normal_queue_curr_size"] =
(double)_cur_normal_queue_element_count_metrics->get_value();
stats["normal_queue_max_elements"] =
(double)_normal_queue.get_max_element_size();
@@ -1866,6 +1992,36 @@ std::map<std::string, double>
BlockFileCache::get_stats() {
return stats;
}
+// for be UTs
+std::map<std::string, double> BlockFileCache::get_stats_unsafe() {
+ std::map<std::string, double> stats;
+ stats["hits_ratio"] = (double)_hit_ratio->get_value();
+ stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
+ stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
+
+ stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
+ stats["index_queue_curr_size"] =
(double)_index_queue.get_capacity_unsafe();
+ stats["index_queue_max_elements"] =
(double)_index_queue.get_max_element_size();
+ stats["index_queue_curr_elements"] =
(double)_index_queue.get_elements_num_unsafe();
+
+ stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
+ stats["ttl_queue_curr_size"] = (double)_ttl_queue.get_capacity_unsafe();
+ stats["ttl_queue_max_elements"] =
(double)_ttl_queue.get_max_element_size();
+ stats["ttl_queue_curr_elements"] =
(double)_ttl_queue.get_elements_num_unsafe();
+
+ stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
+ stats["normal_queue_curr_size"] =
(double)_normal_queue.get_capacity_unsafe();
+ stats["normal_queue_max_elements"] =
(double)_normal_queue.get_max_element_size();
+ stats["normal_queue_curr_elements"] =
(double)_normal_queue.get_elements_num_unsafe();
+
+ stats["disposable_queue_max_size"] =
(double)_disposable_queue.get_max_size();
+ stats["disposable_queue_curr_size"] =
(double)_disposable_queue.get_capacity_unsafe();
+ stats["disposable_queue_max_elements"] =
(double)_disposable_queue.get_max_element_size();
+ stats["disposable_queue_curr_elements"] =
(double)_disposable_queue.get_elements_num_unsafe();
+
+ return stats;
+}
+
template void BlockFileCache::remove(FileBlockSPtr file_block,
std::lock_guard<std::mutex>& cache_lock,
std::lock_guard<std::mutex>& block_lock);
diff --git a/be/src/io/cache/block_file_cache.h
b/be/src/io/cache/block_file_cache.h
index ac30e2411fa..c0c66334a2b 100644
--- a/be/src/io/cache/block_file_cache.h
+++ b/be/src/io/cache/block_file_cache.h
@@ -145,6 +145,9 @@ public:
std::map<std::string, double> get_stats();
+ // for be UTs
+ std::map<std::string, double> get_stats_unsafe();
+
class LRUQueue {
public:
LRUQueue() = default;
@@ -179,6 +182,10 @@ public:
return cache_size;
}
+ size_t get_capacity_unsafe() const { return cache_size; }
+
+ size_t get_elements_num_unsafe() const { return queue.size(); }
+
size_t get_elements_num(std::lock_guard<std::mutex>& /* cache_lock */)
const {
return queue.size();
}
@@ -345,6 +352,7 @@ private:
bool try_reserve_during_async_load(size_t size,
std::lock_guard<std::mutex>& cache_lock);
std::vector<FileCacheType> get_other_cache_type(FileCacheType
cur_cache_type);
+ std::vector<FileCacheType> get_other_cache_type_without_ttl(FileCacheType
cur_cache_type);
bool try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t
offset, int64_t cur_time,
std::lock_guard<std::mutex>& cache_lock);
@@ -390,15 +398,16 @@ private:
void recycle_deleted_blocks();
- bool
try_reserve_from_other_queue_by_hot_interval(std::vector<FileCacheType>
other_cache_types,
+ bool try_reserve_from_other_queue_by_hot_interval(FileCacheType cur_type,
+
std::vector<FileCacheType> other_cache_types,
size_t size, int64_t
cur_time,
std::lock_guard<std::mutex>& cache_lock);
- bool try_reserve_from_other_queue_by_size(std::vector<FileCacheType>
other_cache_types,
+ bool try_reserve_from_other_queue_by_size(FileCacheType cur_type,
+ std::vector<FileCacheType>
other_cache_types,
size_t size,
std::lock_guard<std::mutex>& cache_lock);
- bool is_overflow(size_t removed_size, size_t need_size, size_t
cur_cache_size,
- bool is_ttl = false) const;
+ bool is_overflow(size_t removed_size, size_t need_size, size_t
cur_cache_size) const;
void remove_file_blocks(std::vector<FileBlockCell*>&,
std::lock_guard<std::mutex>&);
@@ -407,7 +416,10 @@ private:
void find_evict_candidates(LRUQueue& queue, size_t size, size_t
cur_cache_size,
size_t& removed_size,
std::vector<FileBlockCell*>& to_evict,
- std::lock_guard<std::mutex>& cache_lock, bool
is_ttl);
+ std::lock_guard<std::mutex>& cache_lock,
size_t& cur_removed_size);
+
+ void recycle_stale_rowset_async_bottom_half();
+
// info
std::string _cache_base_path;
size_t _capacity = 0;
@@ -459,6 +471,10 @@ private:
std::shared_ptr<bvar::Status<size_t>>
_cur_disposable_queue_cache_size_metrics;
std::array<std::shared_ptr<bvar::Adder<size_t>>, 4>
_queue_evict_size_metrics;
std::shared_ptr<bvar::Adder<size_t>> _total_evict_size_metrics;
+ std::shared_ptr<bvar::Adder<size_t>> _evict_by_heat_metrics_matrix[4][4];
+ std::shared_ptr<bvar::Adder<size_t>> _evict_by_size_metrics_matrix[4][4];
+ std::shared_ptr<bvar::Adder<size_t>> _evict_by_self_lru_metrics_matrix[4];
+ std::shared_ptr<bvar::Adder<size_t>> _evict_by_try_release;
std::shared_ptr<bvar::Window<bvar::Adder<size_t>>> _num_hit_blocks_5m;
std::shared_ptr<bvar::Window<bvar::Adder<size_t>>> _num_read_blocks_5m;
@@ -472,6 +488,7 @@ private:
std::shared_ptr<bvar::Status<double>> _hit_ratio;
std::shared_ptr<bvar::Status<double>> _hit_ratio_5m;
std::shared_ptr<bvar::Status<double>> _hit_ratio_1h;
+ std::shared_ptr<bvar::Status<size_t>> _disk_limit_mode_metrics;
};
} // namespace doris::io
diff --git a/be/src/io/cache/file_cache_common.cpp
b/be/src/io/cache/file_cache_common.cpp
index c569ace0011..67487930045 100644
--- a/be/src/io/cache/file_cache_common.cpp
+++ b/be/src/io/cache/file_cache_common.cpp
@@ -34,6 +34,7 @@ std::string FileCacheSettings::to_string() const {
<< ", disposable_queue_elements: " << disposable_queue_elements
<< ", index_queue_size: " << index_queue_size
<< ", index_queue_elements: " << index_queue_elements
+ << ", ttl_queue_size: " << ttl_queue_size << ", ttl_queue_elements: "
<< ttl_queue_elements
<< ", query_queue_size: " << query_queue_size
<< ", query_queue_elements: " << query_queue_elements << ", storage: "
<< storage;
return ss.str();
@@ -58,6 +59,10 @@ FileCacheSettings get_file_cache_settings(size_t capacity,
size_t max_query_cach
std::max(settings.index_queue_size / settings.max_file_block_size,
REMOTE_FS_OBJECTS_CACHE_DEFAULT_ELEMENTS);
+ settings.ttl_queue_size = per_size * config::max_ttl_cache_ratio;
+ settings.ttl_queue_elements = std::max(settings.ttl_queue_size /
settings.max_file_block_size,
+
REMOTE_FS_OBJECTS_CACHE_DEFAULT_ELEMENTS);
+
settings.query_queue_size =
settings.capacity - settings.disposable_queue_size -
settings.index_queue_size;
settings.query_queue_elements =
diff --git a/be/src/io/cache/file_cache_common.h
b/be/src/io/cache/file_cache_common.h
index 21309831a82..30579ba7851 100644
--- a/be/src/io/cache/file_cache_common.h
+++ b/be/src/io/cache/file_cache_common.h
@@ -26,17 +26,17 @@ namespace doris::io {
inline static constexpr size_t REMOTE_FS_OBJECTS_CACHE_DEFAULT_ELEMENTS = 100
* 1024;
inline static constexpr size_t FILE_CACHE_MAX_FILE_BLOCK_SIZE = 1 * 1024 *
1024;
-inline static constexpr size_t DEFAULT_NORMAL_PERCENT = 85;
-inline static constexpr size_t DEFAULT_DISPOSABLE_PERCENT = 10;
+inline static constexpr size_t DEFAULT_NORMAL_PERCENT = 40;
+inline static constexpr size_t DEFAULT_DISPOSABLE_PERCENT = 5;
inline static constexpr size_t DEFAULT_INDEX_PERCENT = 5;
using uint128_t = vectorized::UInt128;
-enum class FileCacheType {
- INDEX,
- NORMAL,
- DISPOSABLE,
- TTL,
+enum FileCacheType {
+ INDEX = 2,
+ NORMAL = 1,
+ DISPOSABLE = 0,
+ TTL = 3,
};
struct UInt128Wrapper {
@@ -93,6 +93,8 @@ struct FileCacheSettings {
size_t index_queue_elements {0};
size_t query_queue_size {0};
size_t query_queue_elements {0};
+ size_t ttl_queue_size {0};
+ size_t ttl_queue_elements {0};
size_t max_file_block_size {0};
size_t max_query_cache_size {0};
std::string storage;
diff --git a/be/test/io/cache/block_file_cache_test.cpp
b/be/test/io/cache/block_file_cache_test.cpp
index f77dc439e95..11e99a48052 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -81,7 +81,7 @@ constexpr unsigned long long operator"" _kb(unsigned long
long m) {
void assert_range([[maybe_unused]] size_t assert_n, io::FileBlockSPtr
file_block,
const io::FileBlock::Range& expected_range,
io::FileBlock::State expected_state) {
auto range = file_block->range();
-
+ std::cout << "assert_range num: " << assert_n << std::endl;
ASSERT_EQ(range.left, expected_range.left);
ASSERT_EQ(range.right, expected_range.right);
ASSERT_EQ(file_block->state(), expected_state);
@@ -139,7 +139,6 @@ class BlockFileCacheTest : public testing::Test {
public:
static void SetUpTestSuite() {
config::file_cache_enter_disk_resource_limit_mode_percent = 99;
- config::enable_ttl_cache_evict_using_lru = false;
bool exists {false};
ASSERT_TRUE(global_local_filesystem()->exists(caches_dir,
&exists).ok());
if (!exists) {
@@ -1110,8 +1109,10 @@ TEST_F(BlockFileCacheTest, max_ttl_size) {
query_id.hi = 1;
query_id.lo = 1;
io::FileCacheSettings settings;
- settings.query_queue_size = 100000000;
- settings.query_queue_elements = 100000;
+ settings.query_queue_size = 50000000;
+ settings.query_queue_elements = 50000;
+ settings.ttl_queue_size = 50000000;
+ settings.ttl_queue_elements = 50000;
settings.capacity = 100000000;
settings.max_file_block_size = 100000;
settings.max_query_cache_size = 30;
@@ -1136,7 +1137,7 @@ TEST_F(BlockFileCacheTest, max_ttl_size) {
auto holder = cache.get_or_set(key1, offset, 100000, context);
auto blocks = fromHolder(holder);
ASSERT_EQ(blocks.size(), 1);
- if (offset < 90000000) {
+ if (offset < 50000000) {
assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
io::FileBlock::State::EMPTY);
ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
@@ -1145,7 +1146,79 @@ TEST_F(BlockFileCacheTest, max_ttl_size) {
io::FileBlock::State::DOWNLOADED);
} else {
assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
- io::FileBlock::State::SKIP_CACHE);
+ io::FileBlock::State::EMPTY);
+ }
+ blocks.clear();
+ }
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+}
+
+TEST_F(BlockFileCacheTest, max_ttl_size_with_other_cache_exist) {
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+ settings.query_queue_size = 50000000;
+ settings.query_queue_elements = 50000;
+ settings.ttl_queue_size = 50000000;
+ settings.ttl_queue_elements = 50000;
+ settings.capacity = 100000000;
+ settings.max_file_block_size = 100000;
+ settings.max_query_cache_size = 30;
+
+ auto key1 = io::BlockFileCache::hash("key5");
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+
+ int i = 0;
+ for (; i < 100; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ ASSERT_TRUE(cache.get_async_open_success());
+
+ // populate the cache with other cache type
+ io::CacheContext context;
+ context.cache_type = io::FileCacheType::NORMAL;
+ context.query_id = query_id;
+ int64_t offset = 100000000;
+ for (; offset < 180000000; offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ blocks.clear();
+ }
+
+ // then get started with TTL
+ context.cache_type = io::FileCacheType::TTL;
+ context.query_id = query_id;
+ int64_t cur_time = UnixSeconds();
+ context.expiration_time = cur_time + 120;
+ offset = 0;
+ for (; offset < 100000000; offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ if (offset < 50000000) {
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+ } else {
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
}
blocks.clear();
}
@@ -1195,7 +1268,7 @@ TEST_F(BlockFileCacheTest, max_ttl_size_memory_storage) {
io::FileBlock::State::DOWNLOADED);
} else {
assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
- io::FileBlock::State::SKIP_CACHE);
+ io::FileBlock::State::EMPTY);
}
blocks.clear();
}
@@ -2065,7 +2138,9 @@ TEST_F(BlockFileCacheTest, ttl_normal) {
io::FileCacheSettings settings;
settings.query_queue_size = 50;
settings.query_queue_elements = 5;
- settings.capacity = 50;
+ settings.ttl_queue_size = 50;
+ settings.ttl_queue_elements = 5;
+ settings.capacity = 100;
settings.max_file_block_size = 30;
settings.max_query_cache_size = 30;
io::CacheContext context;
@@ -2160,7 +2235,9 @@ TEST_F(BlockFileCacheTest, ttl_modify) {
io::FileCacheSettings settings;
settings.query_queue_size = 30;
settings.query_queue_elements = 5;
- settings.capacity = 30;
+ settings.ttl_queue_size = 30;
+ settings.ttl_queue_elements = 5;
+ settings.capacity = 60;
settings.max_file_block_size = 30;
settings.max_query_cache_size = 30;
io::CacheContext context;
@@ -2314,7 +2391,9 @@ TEST_F(BlockFileCacheTest, ttl_change_to_normal) {
io::FileCacheSettings settings;
settings.query_queue_size = 30;
settings.query_queue_elements = 5;
- settings.capacity = 30;
+ settings.ttl_queue_size = 30;
+ settings.ttl_queue_elements = 5;
+ settings.capacity = 60;
settings.max_file_block_size = 30;
settings.max_query_cache_size = 30;
io::CacheContext context;
@@ -2428,7 +2507,9 @@ TEST_F(BlockFileCacheTest, ttl_change_expiration_time) {
io::FileCacheSettings settings;
settings.query_queue_size = 30;
settings.query_queue_elements = 5;
- settings.capacity = 30;
+ settings.ttl_queue_size = 30;
+ settings.ttl_queue_elements = 5;
+ settings.capacity = 60;
settings.max_file_block_size = 30;
settings.max_query_cache_size = 30;
io::CacheContext context;
@@ -2450,6 +2531,16 @@ TEST_F(BlockFileCacheTest, ttl_change_expiration_time) {
auto holder = cache.get_or_set(key2, 50, 10, context); /// Add range
[50, 59]
auto blocks = fromHolder(holder);
ASSERT_EQ(blocks.size(), 1);
+ // std::cout << "current cache size:" << cache.get_used_cache_size()
<< std::endl;
+ std::cout << "cache capacity:" << cache.capacity() << std::endl;
+ auto map = cache.get_stats_unsafe();
+ for (auto& [key, value] : map) {
+ std::cout << key << " : " << value << std::endl;
+ }
+ auto key1 = io::BlockFileCache::hash("key1");
+ std::cout << cache.dump_structure(key1) << std::endl;
+ std::cout << cache.dump_structure(key2) << std::endl;
+
assert_range(1, blocks[0], io::FileBlock::Range(50, 59),
io::FileBlock::State::EMPTY);
ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
download(blocks[0]);
@@ -2532,105 +2623,6 @@ TEST_F(BlockFileCacheTest,
ttl_change_expiration_time_memory_storage) {
}
}
-TEST_F(BlockFileCacheTest, ttl_reverse) {
- if (fs::exists(cache_base_path)) {
- fs::remove_all(cache_base_path);
- }
- fs::create_directories(cache_base_path);
- test_file_cache(io::FileCacheType::NORMAL);
- TUniqueId query_id;
- query_id.hi = 1;
- query_id.lo = 1;
- io::FileCacheSettings settings;
- settings.query_queue_size = 36;
- settings.query_queue_elements = 5;
- settings.capacity = 36;
- settings.max_file_block_size = 7;
- settings.max_query_cache_size = 30;
- io::CacheContext context;
- context.cache_type = io::FileCacheType::TTL;
- context.query_id = query_id;
- int64_t cur_time = UnixSeconds();
- context.expiration_time = cur_time + 180;
- auto key2 = io::BlockFileCache::hash("key2");
- io::BlockFileCache cache(cache_base_path, settings);
- ASSERT_TRUE(cache.initialize());
- for (int i = 0; i < 100; i++) {
- if (cache.get_async_open_success()) {
- break;
- };
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
- }
- ASSERT_TRUE(cache.get_async_open_success());
- for (size_t offset = 0; offset < 30; offset += 6) {
- auto holder = cache.get_or_set(key2, offset, 6, context);
- auto blocks = fromHolder(holder);
- ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
- download(blocks[0]);
- }
- {
- auto holder = cache.get_or_set(key2, 50, 7, context); /// Add range
[50, 57]
- auto blocks = fromHolder(holder);
- assert_range(1, blocks[0], io::FileBlock::Range(50, 56),
io::FileBlock::State::SKIP_CACHE);
- }
- {
- context.cache_type = io::FileCacheType::NORMAL;
- auto holder = cache.get_or_set(key2, 50, 7, context); /// Add range
[50, 57]
- auto blocks = fromHolder(holder);
- assert_range(1, blocks[0], io::FileBlock::Range(50, 56),
io::FileBlock::State::SKIP_CACHE);
- }
-
- if (fs::exists(cache_base_path)) {
- fs::remove_all(cache_base_path);
- }
-}
-
-TEST_F(BlockFileCacheTest, ttl_reverse_memory_storage) {
- test_file_cache_memory_storage(io::FileCacheType::NORMAL);
- TUniqueId query_id;
- query_id.hi = 1;
- query_id.lo = 1;
- io::FileCacheSettings settings;
- settings.query_queue_size = 36;
- settings.query_queue_elements = 5;
- settings.capacity = 36;
- settings.max_file_block_size = 7;
- settings.max_query_cache_size = 30;
- settings.storage = "memory";
- io::CacheContext context;
- context.cache_type = io::FileCacheType::TTL;
- context.query_id = query_id;
- int64_t cur_time = UnixSeconds();
- context.expiration_time = cur_time + 180;
- auto key2 = io::BlockFileCache::hash("key2");
- io::BlockFileCache cache(cache_base_path, settings);
- ASSERT_TRUE(cache.initialize());
- for (int i = 0; i < 100; i++) {
- if (cache.get_async_open_success()) {
- break;
- };
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
- }
- ASSERT_TRUE(cache.get_async_open_success());
- for (size_t offset = 0; offset < 30; offset += 6) {
- auto holder = cache.get_or_set(key2, offset, 6, context);
- auto blocks = fromHolder(holder);
- ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
- download_into_memory(blocks[0]);
- }
- {
- auto holder = cache.get_or_set(key2, 50, 7, context); /// Add range
[50, 57]
- auto blocks = fromHolder(holder);
- assert_range(1, blocks[0], io::FileBlock::Range(50, 56),
io::FileBlock::State::SKIP_CACHE);
- }
- {
- context.cache_type = io::FileCacheType::NORMAL;
- auto holder = cache.get_or_set(key2, 50, 7, context); /// Add range
[50, 57]
- auto blocks = fromHolder(holder);
- assert_range(1, blocks[0], io::FileBlock::Range(50, 56),
io::FileBlock::State::SKIP_CACHE);
- }
-}
-
TEST_F(BlockFileCacheTest, io_error) {
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
@@ -2906,7 +2898,8 @@ TEST_F(BlockFileCacheTest, recyle_cache_async) {
cache.clear_file_cache_async();
while (cache._async_clear_file_cache)
;
- EXPECT_EQ(cache._cur_cache_size, 5);
+ EXPECT_EQ(cache._cur_cache_size, 20); // 0-4 is used again, so all the
cache data in DISPOSABLE
+ // remain unremoved
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
@@ -4838,10 +4831,7 @@ TEST_F(BlockFileCacheTest, recyle_unvalid_ttl_async) {
}
}
-TEST_F(BlockFileCacheTest, ttl_reserve_wo_evict_using_lru) {
- config::file_cache_ttl_valid_check_interval_second = 4;
- config::enable_ttl_cache_evict_using_lru = false;
-
+TEST_F(BlockFileCacheTest, reset_capacity) {
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
@@ -4854,18 +4844,26 @@ TEST_F(BlockFileCacheTest,
ttl_reserve_wo_evict_using_lru) {
settings.query_queue_elements = 5;
settings.index_queue_size = 30;
settings.index_queue_elements = 5;
- settings.disposable_queue_size = 0;
- settings.disposable_queue_elements = 0;
- settings.capacity = 60;
+ settings.disposable_queue_size = 30;
+ settings.disposable_queue_elements = 5;
+ settings.capacity = 90;
settings.max_file_block_size = 30;
settings.max_query_cache_size = 30;
io::CacheContext context;
context.query_id = query_id;
auto key = io::BlockFileCache::hash("key1");
+ auto key2 = io::BlockFileCache::hash("key2");
io::BlockFileCache cache(cache_base_path, settings);
- context.cache_type = io::FileCacheType::TTL;
- context.expiration_time = UnixSeconds() + 3600;
-
+ auto sp = SyncPoint::get_instance();
+ Defer defer {[sp] {
+ sp->clear_call_back("BlockFileCache::set_remove_batch");
+ sp->clear_call_back("BlockFileCache::set_sleep_time");
+ }};
+ sp->set_call_back("BlockFileCache::set_sleep_time",
+ [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 1;
});
+ sp->set_call_back("BlockFileCache::set_remove_batch",
+ [](auto&& args) { *try_any_cast<int*>(args[0]) = 2; });
+ sp->enable_processing();
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
if (cache.get_async_open_success()) {
@@ -4873,7 +4871,8 @@ TEST_F(BlockFileCacheTest,
ttl_reserve_wo_evict_using_lru) {
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
- for (int64_t offset = 0; offset < (60 * config::max_ttl_cache_ratio / 100
- 5); offset += 5) {
+ for (int64_t offset = 0; offset < 45; offset += 5) {
+ context.cache_type = static_cast<io::FileCacheType>((offset / 5) % 3);
auto holder = cache.get_or_set(key, offset, 5, context);
auto segments = fromHolder(holder);
ASSERT_EQ(segments.size(), 1);
@@ -4885,50 +4884,55 @@ TEST_F(BlockFileCacheTest,
ttl_reserve_wo_evict_using_lru) {
io::FileBlock::State::DOWNLOADED);
}
context.cache_type = io::FileCacheType::TTL;
- context.expiration_time = UnixSeconds() + 3600;
- for (int64_t offset = 60; offset < 70; offset += 5) {
- auto holder = cache.get_or_set(key, offset, 5, context);
+ int64_t cur_time = UnixSeconds();
+ context.expiration_time = cur_time + 120;
+ for (int64_t offset = 45; offset < 90; offset += 5) {
+ auto holder = cache.get_or_set(key2, offset, 5, context);
auto segments = fromHolder(holder);
ASSERT_EQ(segments.size(), 1);
assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
- io::FileBlock::State::SKIP_CACHE);
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(segments[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(segments[0]);
+ assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
+ io::FileBlock::State::DOWNLOADED);
}
-
- EXPECT_EQ(cache._cur_cache_size, 50);
- EXPECT_EQ(cache._ttl_queue.cache_size, 0);
+ std::cout << cache.reset_capacity(30) << std::endl;
+ while (cache._async_clear_file_cache)
+ ;
+ EXPECT_EQ(cache._cur_cache_size, 30);
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
}
-TEST_F(BlockFileCacheTest, ttl_reserve_with_evict_using_lru) {
- config::file_cache_ttl_valid_check_interval_second = 4;
- config::enable_ttl_cache_evict_using_lru = true;
-
+TEST_F(BlockFileCacheTest, change_cache_type1) {
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
fs::create_directories(cache_base_path);
+ auto sp = SyncPoint::get_instance();
+ sp->set_call_back("FileBlock::change_cache_type", [](auto&& args) {
+ *try_any_cast<Status*>(args[0]) = Status::IOError("inject io error");
+ });
+ sp->enable_processing();
TUniqueId query_id;
query_id.hi = 1;
query_id.lo = 1;
io::FileCacheSettings settings;
settings.query_queue_size = 30;
settings.query_queue_elements = 5;
- settings.index_queue_size = 30;
- settings.index_queue_elements = 5;
- settings.disposable_queue_size = 0;
- settings.disposable_queue_elements = 0;
- settings.capacity = 60;
+ settings.capacity = 30;
settings.max_file_block_size = 30;
settings.max_query_cache_size = 30;
io::CacheContext context;
+ context.cache_type = io::FileCacheType::TTL;
context.query_id = query_id;
- auto key = io::BlockFileCache::hash("key1");
+ int64_t cur_time = UnixSeconds();
+ context.expiration_time = cur_time + 120;
+ int64_t modify_time = cur_time + 5;
+ auto key1 = io::BlockFileCache::hash("key1");
io::BlockFileCache cache(cache_base_path, settings);
- context.cache_type = io::FileCacheType::TTL;
- context.expiration_time = UnixSeconds() + 3600;
-
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
if (cache.get_async_open_success()) {
@@ -4936,241 +4940,28 @@ TEST_F(BlockFileCacheTest,
ttl_reserve_with_evict_using_lru) {
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
- for (int64_t offset = 0; offset < (60 * config::max_ttl_cache_ratio /
100); offset += 5) {
- auto holder = cache.get_or_set(key, offset, 5, context);
+ {
+ auto holder = cache.get_or_set(key1, 50, 10, context); /// Add range
[50, 59]
auto segments = fromHolder(holder);
ASSERT_EQ(segments.size(), 1);
- assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
- io::FileBlock::State::EMPTY);
+ assert_range(1, segments[0], io::FileBlock::Range(50, 59),
io::FileBlock::State::EMPTY);
ASSERT_TRUE(segments[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
download(segments[0]);
- assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
+ assert_range(1, segments[0], io::FileBlock::Range(50, 59),
io::FileBlock::State::DOWNLOADED);
+ EXPECT_EQ(segments[0]->cache_type(), io::FileCacheType::TTL);
+ EXPECT_EQ(segments[0]->expiration_time(), context.expiration_time);
}
- context.cache_type = io::FileCacheType::TTL;
- context.expiration_time = UnixSeconds() + 3600;
- for (int64_t offset = 60; offset < 70; offset += 5) {
- auto holder = cache.get_or_set(key, offset, 5, context);
+ context.cache_type = io::FileCacheType::NORMAL;
+ context.expiration_time = 0;
+ {
+ auto holder = cache.get_or_set(key1, 50, 10, context); /// Add range
[50, 59]
auto segments = fromHolder(holder);
ASSERT_EQ(segments.size(), 1);
- assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
- io::FileBlock::State::EMPTY);
- ASSERT_TRUE(segments[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
- download(segments[0]);
- assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
+ assert_range(1, segments[0], io::FileBlock::Range(50, 59),
io::FileBlock::State::DOWNLOADED);
- }
-
- EXPECT_EQ(cache._cur_cache_size, 50);
- EXPECT_EQ(cache._ttl_queue.cache_size, 50);
- if (fs::exists(cache_base_path)) {
- fs::remove_all(cache_base_path);
- }
-}
-
-TEST_F(BlockFileCacheTest,
ttl_reserve_with_evict_using_lru_meet_max_ttl_cache_ratio_limit) {
- config::file_cache_ttl_valid_check_interval_second = 4;
- config::enable_ttl_cache_evict_using_lru = true;
- int old = config::max_ttl_cache_ratio;
- config::max_ttl_cache_ratio = 50;
-
- if (fs::exists(cache_base_path)) {
- fs::remove_all(cache_base_path);
- }
- fs::create_directories(cache_base_path);
- TUniqueId query_id;
- query_id.hi = 1;
- query_id.lo = 1;
- io::FileCacheSettings settings;
- settings.query_queue_size = 30;
- settings.query_queue_elements = 5;
- settings.index_queue_size = 30;
- settings.index_queue_elements = 5;
- settings.disposable_queue_size = 0;
- settings.disposable_queue_elements = 0;
- settings.capacity = 60;
- settings.max_file_block_size = 30;
- settings.max_query_cache_size = 30;
- io::CacheContext context;
- context.query_id = query_id;
- auto key = io::BlockFileCache::hash("key1");
- io::BlockFileCache cache(cache_base_path, settings);
- context.cache_type = io::FileCacheType::TTL;
- context.expiration_time = UnixSeconds() + 3600;
-
- ASSERT_TRUE(cache.initialize());
- for (int i = 0; i < 100; i++) {
- if (cache.get_async_open_success()) {
- break;
- };
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
- }
- for (int64_t offset = 0; offset < (60 * config::max_ttl_cache_ratio /
100); offset += 5) {
- auto holder = cache.get_or_set(key, offset, 5, context);
- auto segments = fromHolder(holder);
- ASSERT_EQ(segments.size(), 1);
- assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
- io::FileBlock::State::EMPTY);
- ASSERT_TRUE(segments[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
- download(segments[0]);
- assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
- io::FileBlock::State::DOWNLOADED);
- }
- EXPECT_EQ(cache._cur_cache_size, 30);
- EXPECT_EQ(cache._ttl_queue.cache_size, 30);
- context.cache_type = io::FileCacheType::TTL;
- context.expiration_time = UnixSeconds() + 3600;
- for (int64_t offset = 60; offset < 70; offset += 5) {
- auto holder = cache.get_or_set(key, offset, 5, context);
- auto segments = fromHolder(holder);
- ASSERT_EQ(segments.size(), 1);
- assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
- io::FileBlock::State::EMPTY);
- ASSERT_TRUE(segments[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
- download(segments[0]);
- assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
- io::FileBlock::State::DOWNLOADED);
- }
-
- EXPECT_EQ(cache._cur_cache_size, 30);
- EXPECT_EQ(cache._ttl_queue.cache_size, 30);
- if (fs::exists(cache_base_path)) {
- fs::remove_all(cache_base_path);
- }
- config::max_ttl_cache_ratio = old;
-}
-
-TEST_F(BlockFileCacheTest, reset_capacity) {
- if (fs::exists(cache_base_path)) {
- fs::remove_all(cache_base_path);
- }
- fs::create_directories(cache_base_path);
- TUniqueId query_id;
- query_id.hi = 1;
- query_id.lo = 1;
- io::FileCacheSettings settings;
- settings.query_queue_size = 30;
- settings.query_queue_elements = 5;
- settings.index_queue_size = 30;
- settings.index_queue_elements = 5;
- settings.disposable_queue_size = 30;
- settings.disposable_queue_elements = 5;
- settings.capacity = 90;
- settings.max_file_block_size = 30;
- settings.max_query_cache_size = 30;
- io::CacheContext context;
- context.query_id = query_id;
- auto key = io::BlockFileCache::hash("key1");
- auto key2 = io::BlockFileCache::hash("key2");
- io::BlockFileCache cache(cache_base_path, settings);
- auto sp = SyncPoint::get_instance();
- Defer defer {[sp] {
- sp->clear_call_back("BlockFileCache::set_remove_batch");
- sp->clear_call_back("BlockFileCache::set_sleep_time");
- }};
- sp->set_call_back("BlockFileCache::set_sleep_time",
- [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 1;
});
- sp->set_call_back("BlockFileCache::set_remove_batch",
- [](auto&& args) { *try_any_cast<int*>(args[0]) = 2; });
- sp->enable_processing();
- ASSERT_TRUE(cache.initialize());
- for (int i = 0; i < 100; i++) {
- if (cache.get_async_open_success()) {
- break;
- };
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
- }
- for (int64_t offset = 0; offset < 45; offset += 5) {
- context.cache_type = static_cast<io::FileCacheType>((offset / 5) % 3);
- auto holder = cache.get_or_set(key, offset, 5, context);
- auto segments = fromHolder(holder);
- ASSERT_EQ(segments.size(), 1);
- assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
- io::FileBlock::State::EMPTY);
- ASSERT_TRUE(segments[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
- download(segments[0]);
- assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
- io::FileBlock::State::DOWNLOADED);
- }
- context.cache_type = io::FileCacheType::TTL;
- int64_t cur_time = UnixSeconds();
- context.expiration_time = cur_time + 120;
- for (int64_t offset = 45; offset < 90; offset += 5) {
- auto holder = cache.get_or_set(key2, offset, 5, context);
- auto segments = fromHolder(holder);
- ASSERT_EQ(segments.size(), 1);
- assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
- io::FileBlock::State::EMPTY);
- ASSERT_TRUE(segments[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
- download(segments[0]);
- assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
- io::FileBlock::State::DOWNLOADED);
- }
- std::cout << cache.reset_capacity(30) << std::endl;
- while (cache._async_clear_file_cache)
- ;
- EXPECT_EQ(cache._cur_cache_size, 30);
- if (fs::exists(cache_base_path)) {
- fs::remove_all(cache_base_path);
- }
-}
-
-TEST_F(BlockFileCacheTest, change_cache_type1) {
- if (fs::exists(cache_base_path)) {
- fs::remove_all(cache_base_path);
- }
- fs::create_directories(cache_base_path);
- auto sp = SyncPoint::get_instance();
- sp->set_call_back("FileBlock::change_cache_type", [](auto&& args) {
- *try_any_cast<Status*>(args[0]) = Status::IOError("inject io error");
- });
- sp->enable_processing();
- TUniqueId query_id;
- query_id.hi = 1;
- query_id.lo = 1;
- io::FileCacheSettings settings;
- settings.query_queue_size = 30;
- settings.query_queue_elements = 5;
- settings.capacity = 30;
- settings.max_file_block_size = 30;
- settings.max_query_cache_size = 30;
- io::CacheContext context;
- context.cache_type = io::FileCacheType::TTL;
- context.query_id = query_id;
- int64_t cur_time = UnixSeconds();
- context.expiration_time = cur_time + 120;
- int64_t modify_time = cur_time + 5;
- auto key1 = io::BlockFileCache::hash("key1");
- io::BlockFileCache cache(cache_base_path, settings);
- ASSERT_TRUE(cache.initialize());
- for (int i = 0; i < 100; i++) {
- if (cache.get_async_open_success()) {
- break;
- };
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
- }
- {
- auto holder = cache.get_or_set(key1, 50, 10, context); /// Add range
[50, 59]
- auto segments = fromHolder(holder);
- ASSERT_EQ(segments.size(), 1);
- assert_range(1, segments[0], io::FileBlock::Range(50, 59),
io::FileBlock::State::EMPTY);
- ASSERT_TRUE(segments[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
- download(segments[0]);
- assert_range(1, segments[0], io::FileBlock::Range(50, 59),
- io::FileBlock::State::DOWNLOADED);
- EXPECT_EQ(segments[0]->cache_type(), io::FileCacheType::TTL);
- EXPECT_EQ(segments[0]->expiration_time(), context.expiration_time);
- }
- context.cache_type = io::FileCacheType::NORMAL;
- context.expiration_time = 0;
- {
- auto holder = cache.get_or_set(key1, 50, 10, context); /// Add range
[50, 59]
- auto segments = fromHolder(holder);
- ASSERT_EQ(segments.size(), 1);
- assert_range(1, segments[0], io::FileBlock::Range(50, 59),
- io::FileBlock::State::DOWNLOADED);
- EXPECT_EQ(segments[0]->cache_type(), io::FileCacheType::NORMAL);
- EXPECT_EQ(segments[0]->expiration_time(), 0);
+ EXPECT_EQ(segments[0]->cache_type(), io::FileCacheType::NORMAL);
+ EXPECT_EQ(segments[0]->expiration_time(), 0);
}
sp->clear_call_back("FileBlock::change_cache_type");
context.cache_type = io::FileCacheType::TTL;
@@ -5493,4 +5284,1388 @@ TEST_F(BlockFileCacheTest,
file_cache_path_storage_parse) {
}
}
+TEST_F(BlockFileCacheTest, populate_empty_cache_with_disposable) {
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+
+ settings.ttl_queue_size = 5000000;
+ settings.ttl_queue_elements = 50000;
+ settings.query_queue_size = 3000000;
+ settings.query_queue_elements = 30000;
+ settings.index_queue_size = 1000000;
+ settings.index_queue_elements = 10000;
+ settings.disposable_queue_size = 1000000;
+ settings.disposable_queue_elements = 10000;
+ settings.capacity = 10000000;
+ settings.max_file_block_size = 100000;
+ settings.max_query_cache_size = 30;
+
+ size_t limit = 1000000;
+ size_t cache_max = 10000000;
+ io::CacheContext context;
+ context.cache_type = io::FileCacheType::DISPOSABLE;
+ context.query_id = query_id;
+ // int64_t cur_time = UnixSeconds();
+ // context.expiration_time = cur_time + 120;
+ auto key1 = io::BlockFileCache::hash("key1");
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ int i = 0;
+ for (; i < 100; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ ASSERT_TRUE(cache.get_async_open_success());
+ int64_t offset = 0;
+ // fill the cache to its limit
+ for (; offset < limit; offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(2, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ // grab more exceed the limit to max cache capacity
+ for (; offset < cache_max; offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(3, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(4, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"],
cache_max);
+ ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 0);
+
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE]->get_value(),
0);
+
+ // grab more exceed the cache capacity
+ size_t exceed = 2000000;
+ for (; offset < (cache_max + exceed); offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(5, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(6, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"],
cache_max);
+ ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 0);
+
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE]->get_value(),
+ exceed);
+
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+}
+
+TEST_F(BlockFileCacheTest, populate_empty_cache_with_normal) {
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+
+ settings.ttl_queue_size = 5000000;
+ settings.ttl_queue_elements = 50000;
+ settings.query_queue_size = 3000000;
+ settings.query_queue_elements = 30000;
+ settings.index_queue_size = 1000000;
+ settings.index_queue_elements = 10000;
+ settings.disposable_queue_size = 1000000;
+ settings.disposable_queue_elements = 10000;
+ settings.capacity = 10000000;
+ settings.max_file_block_size = 100000;
+ settings.max_query_cache_size = 30;
+
+ size_t limit = 3000000;
+ size_t cache_max = 10000000;
+ io::CacheContext context;
+ context.cache_type = io::FileCacheType::NORMAL;
+ context.query_id = query_id;
+ // int64_t cur_time = UnixSeconds();
+ // context.expiration_time = cur_time + 120;
+ auto key1 = io::BlockFileCache::hash("key1");
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ int i = 0;
+ for (; i < 100; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ ASSERT_TRUE(cache.get_async_open_success());
+ int64_t offset = 0;
+ // fill the cache to its limit
+ for (; offset < limit; offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(2, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ // grab more exceed the limit to max cache capacity
+ for (; offset < cache_max; offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(3, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(4, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], cache_max);
+
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL]->get_value(),
0);
+
+ // grab more exceed the cache capacity
+ size_t exceed = 2000000;
+ for (; offset < (cache_max + exceed); offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(5, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(6, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], cache_max);
+
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL]->get_value(),
exceed);
+
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+}
+
+TEST_F(BlockFileCacheTest, populate_empty_cache_with_index) {
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+
+ settings.ttl_queue_size = 5000000;
+ settings.ttl_queue_elements = 50000;
+ settings.query_queue_size = 3000000;
+ settings.query_queue_elements = 30000;
+ settings.index_queue_size = 1000000;
+ settings.index_queue_elements = 10000;
+ settings.disposable_queue_size = 1000000;
+ settings.disposable_queue_elements = 10000;
+ settings.capacity = 10000000;
+ settings.max_file_block_size = 100000;
+ settings.max_query_cache_size = 30;
+
+ size_t limit = 1000000;
+ size_t cache_max = 10000000;
+ io::CacheContext context;
+ context.cache_type = io::FileCacheType::INDEX;
+ context.query_id = query_id;
+ // int64_t cur_time = UnixSeconds();
+ // context.expiration_time = cur_time + 120;
+ auto key1 = io::BlockFileCache::hash("key1");
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ int i = 0;
+ for (; i < 100; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ ASSERT_TRUE(cache.get_async_open_success());
+ int64_t offset = 0;
+ // fill the cache to its limit
+ for (; offset < limit; offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(2, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ // grab more exceed the limit to max cache capacity
+ for (; offset < cache_max; offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(3, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(4, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], cache_max);
+ ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 0);
+
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::INDEX]->get_value(),
0);
+
+ // grab more exceed the cache capacity
+ size_t exceed = 2000000;
+ for (; offset < (cache_max + exceed); offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(5, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(6, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], cache_max);
+ ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 0);
+
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::INDEX]->get_value(),
exceed);
+
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+}
+
+TEST_F(BlockFileCacheTest, populate_empty_cache_with_ttl) {
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+
+ settings.ttl_queue_size = 5000000;
+ settings.ttl_queue_elements = 50000;
+ settings.query_queue_size = 3000000;
+ settings.query_queue_elements = 30000;
+ settings.index_queue_size = 1000000;
+ settings.index_queue_elements = 10000;
+ settings.disposable_queue_size = 1000000;
+ settings.disposable_queue_elements = 10000;
+ settings.capacity = 10000000;
+ settings.max_file_block_size = 100000;
+ settings.max_query_cache_size = 30;
+
+ size_t limit = 5000000;
+ size_t cache_max = 10000000;
+ io::CacheContext context;
+ context.cache_type = io::FileCacheType::TTL;
+ context.query_id = query_id;
+ int64_t cur_time = UnixSeconds();
+ context.expiration_time = cur_time + 120;
+ auto key1 = io::BlockFileCache::hash("key1");
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ int i = 0;
+ for (; i < 100; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ ASSERT_TRUE(cache.get_async_open_success());
+ int64_t offset = 0;
+ // fill the cache to its limit
+ for (; offset < limit; offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(2, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ // grab more exceed the limit to max cache capacity
+ for (; offset < cache_max; offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(3, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(4, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], cache_max);
+ ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 0);
+
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::TTL]->get_value(),
0);
+
+ // grab more exceed the cache capacity
+ size_t exceed = 2000000;
+ for (; offset < (cache_max + exceed); offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(5, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(6, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], cache_max);
+ ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 0);
+
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::TTL]->get_value(),
exceed);
+
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+}
+
+TEST_F(BlockFileCacheTest, disposable_seize_after_normal) {
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+
+ settings.ttl_queue_size = 5000000;
+ settings.ttl_queue_elements = 50000;
+ settings.query_queue_size = 3000000;
+ settings.query_queue_elements = 30000;
+ settings.index_queue_size = 1000000;
+ settings.index_queue_elements = 10000;
+ settings.disposable_queue_size = 1000000;
+ settings.disposable_queue_elements = 10000;
+ settings.capacity = 10000000;
+ settings.max_file_block_size = 100000;
+ settings.max_query_cache_size = 30;
+
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ int i = 0;
+ for (; i < 100; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ ASSERT_TRUE(cache.get_async_open_success());
+
+ size_t limit = 1000000;
+ size_t cache_max = 10000000;
+
+ io::CacheContext context1;
+ context1.cache_type = io::FileCacheType::NORMAL;
+ context1.query_id = query_id;
+ auto key1 = io::BlockFileCache::hash("key1");
+
+ int64_t offset = 0;
+ // fill the cache
+ for (; offset < cache_max; offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context1);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(2, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], cache_max);
+ // our hero comes to the stage
+ io::CacheContext context2;
+ context2.cache_type = io::FileCacheType::DISPOSABLE;
+ context2.query_id = query_id;
+ auto key2 = io::BlockFileCache::hash("key2");
+ offset = 0;
+ for (; offset < limit; offset += 100000) {
+ auto holder = cache.get_or_set(key2, offset, 100000, context2);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(3, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(4, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], limit);
+ ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], cache_max -
limit);
+
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE]
+ ->get_value(),
+ limit);
+
+ // grab more exceed the limit
+ size_t exceed = 2000000;
+ for (; offset < (limit + exceed); offset += 100000) {
+ auto holder = cache.get_or_set(key2, offset, 100000, context2);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(5, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(6, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], limit);
+ ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], cache_max -
limit);
+
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE]->get_value(),
+ exceed);
+
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+}
+
+TEST_F(BlockFileCacheTest, seize_after_full) {
+ struct Args {
+ io::FileCacheType first_type;
+ io::FileCacheType second_type;
+ size_t second_limit;
+ std::string first_metrics;
+ std::string second_metrics;
+ };
+
+ std::vector<Args> args_vec = {
+ {io::FileCacheType::NORMAL, io::FileCacheType::DISPOSABLE, 1000000,
+ "normal_queue_curr_size", "disposable_queue_curr_size"},
+ {io::FileCacheType::NORMAL, io::FileCacheType::INDEX, 1000000,
"normal_queue_curr_size",
+ "index_queue_curr_size"},
+ {io::FileCacheType::NORMAL, io::FileCacheType::TTL, 5000000,
"normal_queue_curr_size",
+ "ttl_queue_curr_size"},
+ {io::FileCacheType::DISPOSABLE, io::FileCacheType::NORMAL, 3000000,
+ "disposable_queue_curr_size", "normal_queue_curr_size"},
+ {io::FileCacheType::DISPOSABLE, io::FileCacheType::INDEX, 1000000,
+ "disposable_queue_curr_size", "index_queue_curr_size"},
+ {io::FileCacheType::DISPOSABLE, io::FileCacheType::TTL, 5000000,
+ "disposable_queue_curr_size", "ttl_queue_curr_size"},
+ {io::FileCacheType::INDEX, io::FileCacheType::NORMAL, 3000000,
"index_queue_curr_size",
+ "normal_queue_curr_size"},
+ {io::FileCacheType::INDEX, io::FileCacheType::DISPOSABLE, 1000000,
+ "index_queue_curr_size", "disposable_queue_curr_size"},
+ {io::FileCacheType::INDEX, io::FileCacheType::TTL, 5000000,
"index_queue_curr_size",
+ "ttl_queue_curr_size"},
+ {io::FileCacheType::TTL, io::FileCacheType::NORMAL, 3000000,
"ttl_queue_curr_size",
+ "normal_queue_curr_size"},
+ {io::FileCacheType::TTL, io::FileCacheType::DISPOSABLE, 1000000,
"ttl_queue_curr_size",
+ "disposable_queue_curr_size"},
+ {io::FileCacheType::TTL, io::FileCacheType::INDEX, 1000000,
"ttl_queue_curr_size",
+ "index_queue_curr_size"},
+ };
+
+ for (auto& args : args_vec) {
+ std::cout << "filled with " <<
io::BlockFileCache::cache_type_to_string(args.first_type)
+ << " and seize with "
+ <<
io::BlockFileCache::cache_type_to_string(args.second_type) << std::endl;
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+
+ settings.ttl_queue_size = 5000000;
+ settings.ttl_queue_elements = 50000;
+ settings.query_queue_size = 3000000;
+ settings.query_queue_elements = 30000;
+ settings.index_queue_size = 1000000;
+ settings.index_queue_elements = 10000;
+ settings.disposable_queue_size = 1000000;
+ settings.disposable_queue_elements = 10000;
+ settings.capacity = 10000000;
+ settings.max_file_block_size = 100000;
+ settings.max_query_cache_size = 30;
+
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ int i = 0;
+ for (; i < 100; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ ASSERT_TRUE(cache.get_async_open_success());
+
+ size_t limit = args.second_limit;
+ size_t cache_max = 10000000;
+
+ io::CacheContext context1;
+ context1.cache_type = args.first_type;
+ context1.query_id = query_id;
+ if (args.first_type == io::FileCacheType::TTL) {
+ int64_t cur_time = UnixSeconds();
+ context1.expiration_time = cur_time + 120;
+ }
+ auto key1 = io::BlockFileCache::hash("key1");
+
+ int64_t offset = 0;
+ // fill the cache
+ for (; offset < cache_max; offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context1);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(2, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ ASSERT_EQ(cache.get_stats_unsafe()[args.first_metrics], cache_max);
+ // our hero comes to the stage
+ io::CacheContext context2;
+ context2.cache_type = args.second_type;
+ context2.query_id = query_id;
+ if (context2.cache_type == io::FileCacheType::TTL) {
+ int64_t cur_time = UnixSeconds();
+ context2.expiration_time = cur_time + 120;
+ }
+ auto key2 = io::BlockFileCache::hash("key2");
+ offset = 0;
+ for (; offset < limit; offset += 100000) {
+ auto holder = cache.get_or_set(key2, offset, 100000, context2);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(3, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(4, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ ASSERT_EQ(cache.get_stats_unsafe()[args.second_metrics], limit);
+ ASSERT_EQ(cache.get_stats_unsafe()[args.first_metrics], cache_max -
limit);
+ ASSERT_EQ(
+
cache._evict_by_size_metrics_matrix[args.first_type][args.second_type]->get_value(),
+ limit);
+
+ // grab more exceed the limit
+ size_t exceed = 2000000;
+ for (; offset < (limit + exceed); offset += 100000) {
+ auto holder = cache.get_or_set(key2, offset, 100000, context2);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(5, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(6, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ ASSERT_EQ(cache.get_stats_unsafe()[args.second_metrics], limit);
+ ASSERT_EQ(cache.get_stats_unsafe()[args.first_metrics], cache_max -
limit);
+
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[args.second_type]->get_value(),
exceed);
+
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ }
+}
+
+TEST_F(BlockFileCacheTest, evict_privilege_order_for_disposable) {
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+
+ settings.ttl_queue_size = 5000000;
+ settings.ttl_queue_elements = 50000;
+ settings.query_queue_size = 3000000;
+ settings.query_queue_elements = 30000;
+ settings.index_queue_size = 1000000;
+ settings.index_queue_elements = 10000;
+ settings.disposable_queue_size = 1000000;
+ settings.disposable_queue_elements = 10000;
+ settings.capacity = 10000000;
+ settings.max_file_block_size = 100000;
+ settings.max_query_cache_size = 30;
+
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ int i = 0;
+ for (; i < 100; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ ASSERT_TRUE(cache.get_async_open_success());
+
+ io::CacheContext context1;
+ context1.cache_type = io::FileCacheType::NORMAL;
+ context1.query_id = query_id;
+ auto key1 = io::BlockFileCache::hash("key1");
+
+ int64_t offset = 0;
+
+ for (; offset < 3500000; offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context1);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(2, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ io::CacheContext context2;
+ context2.cache_type = io::FileCacheType::INDEX;
+ context2.query_id = query_id;
+ auto key2 = io::BlockFileCache::hash("key2");
+
+ offset = 0;
+
+ for (; offset < 1300000; offset += 100000) {
+ auto holder = cache.get_or_set(key2, offset, 100000, context2);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(2, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ io::CacheContext context3;
+ context3.cache_type = io::FileCacheType::TTL;
+ context3.query_id = query_id;
+ context3.expiration_time = UnixSeconds() + 120;
+ auto key3 = io::BlockFileCache::hash("key3");
+
+ offset = 0;
+
+ for (; offset < 5200000; offset += 100000) {
+ auto holder = cache.get_or_set(key3, offset, 100000, context3);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(2, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5200000);
+ ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1300000);
+ ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3500000);
+
+ // our hero comes to the stage
+ io::CacheContext context4;
+ context4.cache_type = io::FileCacheType::DISPOSABLE;
+ context4.query_id = query_id;
+ auto key4 = io::BlockFileCache::hash("key4");
+
+ offset = 0;
+
+ for (; offset < 1000000; offset += 100000) {
+ auto holder = cache.get_or_set(key4, offset, 100000, context4);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(2, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1000000);
+ ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5000000);
+ ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1000000);
+ ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3000000);
+
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE]
+ ->get_value(),
+ 500000);
+
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE]
+ ->get_value(),
+ 300000);
+
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE]
+ ->get_value(),
+ 200000);
+
+ size_t exceed = 200000;
+ for (; offset < (1000000 + exceed); offset += 100000) {
+ auto holder = cache.get_or_set(key4, offset, 100000, context4);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(3, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(4, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1000000);
+ ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5000000);
+ ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1000000);
+ ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3000000);
+
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE]
+ ->get_value(),
+ 500000);
+
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE]
+ ->get_value(),
+ 300000);
+
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE]
+ ->get_value(),
+ 200000);
+
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE]->get_value(),
+ exceed);
+
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+}
+
+TEST_F(BlockFileCacheTest, evict_privilege_order_for_normal) {
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+
+ settings.ttl_queue_size = 5000000;
+ settings.ttl_queue_elements = 50000;
+ settings.query_queue_size = 3000000;
+ settings.query_queue_elements = 30000;
+ settings.index_queue_size = 1000000;
+ settings.index_queue_elements = 10000;
+ settings.disposable_queue_size = 1000000;
+ settings.disposable_queue_elements = 10000;
+ settings.capacity = 10000000;
+ settings.max_file_block_size = 100000;
+ settings.max_query_cache_size = 30;
+
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ int i = 0;
+ for (; i < 100; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ ASSERT_TRUE(cache.get_async_open_success());
+
+ io::CacheContext context1;
+ context1.cache_type = io::FileCacheType::DISPOSABLE;
+ context1.query_id = query_id;
+ auto key1 = io::BlockFileCache::hash("key1");
+
+ int64_t offset = 0;
+
+ for (; offset < 1500000; offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context1);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(2, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ io::CacheContext context2;
+ context2.cache_type = io::FileCacheType::INDEX;
+ context2.query_id = query_id;
+ auto key2 = io::BlockFileCache::hash("key2");
+
+ offset = 0;
+
+ for (; offset < 1300000; offset += 100000) {
+ auto holder = cache.get_or_set(key2, offset, 100000, context2);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(2, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ io::CacheContext context3;
+ context3.cache_type = io::FileCacheType::TTL;
+ context3.query_id = query_id;
+ context3.expiration_time = UnixSeconds() + 120;
+ auto key3 = io::BlockFileCache::hash("key3");
+
+ offset = 0;
+
+ for (; offset < 7200000; offset += 100000) {
+ auto holder = cache.get_or_set(key3, offset, 100000, context3);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(2, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1500000);
+ ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 7200000);
+ ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1300000);
+ ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 0);
+
+ // our hero comes to the stage
+ io::CacheContext context4;
+ context4.cache_type = io::FileCacheType::NORMAL;
+ context4.query_id = query_id;
+ auto key4 = io::BlockFileCache::hash("key4");
+
+ offset = 0;
+
+ for (; offset < 3000000; offset += 100000) {
+ auto holder = cache.get_or_set(key4, offset, 100000, context4);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(2, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1000000);
+ ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5000000);
+ ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1000000);
+ ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3000000);
+
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL]
+ ->get_value(),
+ 500000);
+
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL]
+ ->get_value(),
+ 300000);
+
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL]
+ ->get_value(),
+ 2200000);
+
+ size_t exceed = 200000;
+ for (; offset < (3000000 + exceed); offset += 100000) {
+ auto holder = cache.get_or_set(key4, offset, 100000, context4);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(3, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(4, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1000000);
+ ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5000000);
+ ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1000000);
+ ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3000000);
+
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL]
+ ->get_value(),
+ 500000);
+
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL]
+ ->get_value(),
+ 300000);
+
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL]
+ ->get_value(),
+ 2200000);
+
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL]->get_value(),
exceed);
+
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+}
+
+TEST_F(BlockFileCacheTest, evict_privilege_order_for_index) {
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+
+ settings.ttl_queue_size = 5000000;
+ settings.ttl_queue_elements = 50000;
+ settings.query_queue_size = 3000000;
+ settings.query_queue_elements = 30000;
+ settings.index_queue_size = 1000000;
+ settings.index_queue_elements = 10000;
+ settings.disposable_queue_size = 1000000;
+ settings.disposable_queue_elements = 10000;
+ settings.capacity = 10000000;
+ settings.max_file_block_size = 100000;
+ settings.max_query_cache_size = 30;
+
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ int i = 0;
+ for (; i < 100; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ ASSERT_TRUE(cache.get_async_open_success());
+
+ io::CacheContext context1;
+ context1.cache_type = io::FileCacheType::DISPOSABLE;
+ context1.query_id = query_id;
+ auto key1 = io::BlockFileCache::hash("key1");
+
+ int64_t offset = 0;
+
+ for (; offset < 1500000; offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context1);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(2, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ io::CacheContext context2;
+ context2.cache_type = io::FileCacheType::NORMAL;
+ context2.query_id = query_id;
+ auto key2 = io::BlockFileCache::hash("key2");
+
+ offset = 0;
+
+ for (; offset < 3300000; offset += 100000) {
+ auto holder = cache.get_or_set(key2, offset, 100000, context2);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(2, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ io::CacheContext context3;
+ context3.cache_type = io::FileCacheType::TTL;
+ context3.query_id = query_id;
+ context3.expiration_time = UnixSeconds() + 120;
+ auto key3 = io::BlockFileCache::hash("key3");
+
+ offset = 0;
+
+ for (; offset < 5200000; offset += 100000) {
+ auto holder = cache.get_or_set(key3, offset, 100000, context3);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(2, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1500000);
+ ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5200000);
+ ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3300000);
+
+ // our hero comes to the stage
+ io::CacheContext context4;
+ context4.cache_type = io::FileCacheType::INDEX;
+ context4.query_id = query_id;
+ auto key4 = io::BlockFileCache::hash("key4");
+
+ offset = 0;
+
+ for (; offset < 1000000; offset += 100000) {
+ auto holder = cache.get_or_set(key4, offset, 100000, context4);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(2, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1000000);
+ ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5000000);
+ ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1000000);
+ ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3000000);
+
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX]
+ ->get_value(),
+ 500000);
+
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX]
+ ->get_value(),
+ 300000);
+
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX]
+ ->get_value(),
+ 200000);
+
+ size_t exceed = 200000;
+ for (; offset < (1000000 + exceed); offset += 100000) {
+ auto holder = cache.get_or_set(key4, offset, 100000, context4);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(3, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(4, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1000000);
+ ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5000000);
+ ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1000000);
+ ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3000000);
+
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX]
+ ->get_value(),
+ 500000);
+
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX]
+ ->get_value(),
+ 300000);
+
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX]
+ ->get_value(),
+ 200000);
+
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::INDEX]->get_value(),
exceed);
+
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+}
+
+TEST_F(BlockFileCacheTest, evict_privilege_order_for_ttl) {
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+
+ settings.ttl_queue_size = 5000000;
+ settings.ttl_queue_elements = 50000;
+ settings.query_queue_size = 3000000;
+ settings.query_queue_elements = 30000;
+ settings.index_queue_size = 1000000;
+ settings.index_queue_elements = 10000;
+ settings.disposable_queue_size = 1000000;
+ settings.disposable_queue_elements = 10000;
+ settings.capacity = 10000000;
+ settings.max_file_block_size = 100000;
+ settings.max_query_cache_size = 30;
+
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ int i = 0;
+ for (; i < 100; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ ASSERT_TRUE(cache.get_async_open_success());
+
+ io::CacheContext context1;
+ context1.cache_type = io::FileCacheType::DISPOSABLE;
+ context1.query_id = query_id;
+ auto key1 = io::BlockFileCache::hash("key1");
+
+ int64_t offset = 0;
+
+ for (; offset < 1500000; offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context1);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(2, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ io::CacheContext context2;
+ context2.cache_type = io::FileCacheType::INDEX;
+ context2.query_id = query_id;
+ auto key2 = io::BlockFileCache::hash("key2");
+
+ offset = 0;
+
+ for (; offset < 1300000; offset += 100000) {
+ auto holder = cache.get_or_set(key2, offset, 100000, context2);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(2, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ io::CacheContext context3;
+ context3.cache_type = io::FileCacheType::NORMAL;
+ context3.query_id = query_id;
+ auto key3 = io::BlockFileCache::hash("key3");
+
+ offset = 0;
+
+ for (; offset < 7200000; offset += 100000) {
+ auto holder = cache.get_or_set(key3, offset, 100000, context3);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(2, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1500000);
+ ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1300000);
+ ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 7200000);
+
+ // our hero comes to the stage
+ io::CacheContext context4;
+ context4.cache_type = io::FileCacheType::TTL;
+ context4.query_id = query_id;
+ context4.expiration_time = UnixSeconds() + 120;
+ auto key4 = io::BlockFileCache::hash("key4");
+
+ offset = 0;
+
+ for (; offset < 5000000; offset += 100000) {
+ auto holder = cache.get_or_set(key4, offset, 100000, context4);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(2, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1000000);
+ ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5000000);
+ ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1000000);
+ ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3000000);
+
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL]
+ ->get_value(),
+ 500000);
+
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL]
+ ->get_value(),
+ 300000);
+
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL]
+ ->get_value(),
+ 4200000);
+
+ size_t exceed = 200000;
+ for (; offset < (5000000 + exceed); offset += 100000) {
+ auto holder = cache.get_or_set(key4, offset, 100000, context4);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(3, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(4, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1000000);
+ ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5000000);
+ ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1000000);
+ ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3000000);
+
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL]
+ ->get_value(),
+ 500000);
+
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL]
+ ->get_value(),
+ 300000);
+
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL]
+ ->get_value(),
+ 4200000);
+
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::TTL]->get_value(),
exceed);
+
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+}
+
} // namespace doris::io
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]