This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 8120c598d35 [enhancement](cloud) refine block file cache evict policy 
(#42451) (#43201)
8120c598d35 is described below

commit 8120c598d350368cc726a23ac2060b141818317e
Author: zhengyu <[email protected]>
AuthorDate: Mon Nov 4 20:19:50 2024 +0800

    [enhancement](cloud) refine block file cache evict policy (#42451) (#43201)
    
    - Enhance cache performance and reduce cache misses by:
        - Prevent starvation of any particular cache.
        - Improve disk space utilization.
        - Maintain strict and systematic eviction priorities.
    - Enhance the observability of caching strategies, allowing cache
    behavior to be intuitively self-explanatory through monitoring.
    - Improve code comprehensibility: unify the cache framework to avoid
    ad-hoc handling of TTL, increasing the modularity of the code.
    - Reduce operational complexity by eliminating and standardizing
    configuration items.
    
    Signed-off-by: freemandealer <[email protected]>
---
 be/src/common/config.cpp                   |    4 +-
 be/src/io/cache/block_file_cache.cpp       |  260 +++-
 be/src/io/cache/block_file_cache.h         |   27 +-
 be/src/io/cache/file_cache_common.cpp      |    5 +
 be/src/io/cache/file_cache_common.h        |   16 +-
 be/test/io/cache/block_file_cache_test.cpp | 1911 ++++++++++++++++++++++------
 6 files changed, 1789 insertions(+), 434 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 27244c23279..bb92cf18b73 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1030,7 +1030,7 @@ DEFINE_Bool(enable_file_cache_query_limit, "false");
 DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "90");
 DEFINE_mInt32(file_cache_exit_disk_resource_limit_mode_percent, "80");
 DEFINE_mBool(enable_read_cache_file_directly, "false");
-DEFINE_mBool(file_cache_enable_evict_from_other_queue_by_size, "false");
+DEFINE_mBool(file_cache_enable_evict_from_other_queue_by_size, "true");
 DEFINE_mInt64(file_cache_ttl_valid_check_interval_second, "0"); // zero for 
not checking
 // If true, evict the ttl cache using LRU when full.
 // Otherwise, only expiration can evict ttl and new data won't add to cache 
when full.
@@ -1313,7 +1313,7 @@ DEFINE_Int64(num_s3_file_upload_thread_pool_min_thread, 
"16");
 // The max thread num for S3FileUploadThreadPool
 DEFINE_Int64(num_s3_file_upload_thread_pool_max_thread, "64");
 // The max ratio for ttl cache's size
-DEFINE_mInt64(max_ttl_cache_ratio, "90");
+DEFINE_mInt64(max_ttl_cache_ratio, "50");
 // The maximum jvm heap usage ratio for hdfs write workload
 DEFINE_mDouble(max_hdfs_wirter_jni_heap_usage_ratio, "0.5");
 // The sleep milliseconds duration when hdfs write exceeds the maximum usage
diff --git a/be/src/io/cache/block_file_cache.cpp 
b/be/src/io/cache/block_file_cache.cpp
index cd502d16547..1a840e2dc6f 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -84,6 +84,94 @@ BlockFileCache::BlockFileCache(const std::string& 
cache_base_path,
     _total_evict_size_metrics = std::make_shared<bvar::Adder<size_t>>(
             _cache_base_path.c_str(), "file_cache_total_evict_size");
 
+    
_evict_by_heat_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] 
=
+            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+                                                  
"file_cache_evict_by_heat_disposable_to_normal");
+    
_evict_by_heat_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
+            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+                                                  
"file_cache_evict_by_heat_disposable_to_index");
+    
_evict_by_heat_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
+            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+                                                  
"file_cache_evict_by_heat_disposable_to_ttl");
+    
_evict_by_heat_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] 
=
+            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+                                                  
"file_cache_evict_by_heat_normal_to_disposable");
+    _evict_by_heat_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] 
=
+            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+                                                  
"file_cache_evict_by_heat_normal_to_index");
+    _evict_by_heat_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
+            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+                                                  
"file_cache_evict_by_heat_normal_to_ttl");
+    
_evict_by_heat_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
+            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+                                                  
"file_cache_evict_by_heat_index_to_disposable");
+    _evict_by_heat_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] 
=
+            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+                                                  
"file_cache_evict_by_heat_index_to_normal");
+    _evict_by_heat_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
+            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+                                                  
"file_cache_evict_by_heat_index_to_ttl");
+    
_evict_by_heat_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
+            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+                                                  
"file_cache_evict_by_heat_ttl_to_disposable");
+    _evict_by_heat_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
+            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+                                                  
"file_cache_evict_by_heat_ttl_to_normal");
+    _evict_by_heat_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
+            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+                                                  
"file_cache_evict_by_heat_ttl_to_index");
+
+    _evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE] =
+            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+                                                  
"file_cache_evict_by_self_lru_disposable");
+    _evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL] =
+            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+                                                  
"file_cache_evict_by_self_lru_normal");
+    _evict_by_self_lru_metrics_matrix[FileCacheType::INDEX] = 
std::make_shared<bvar::Adder<size_t>>(
+            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_index");
+    _evict_by_self_lru_metrics_matrix[FileCacheType::TTL] = 
std::make_shared<bvar::Adder<size_t>>(
+            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_ttl");
+
+    
_evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] 
=
+            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+                                                  
"file_cache_evict_by_size_disposable_to_normal");
+    
_evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
+            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+                                                  
"file_cache_evict_by_size_disposable_to_index");
+    
_evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
+            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+                                                  
"file_cache_evict_by_size_disposable_to_ttl");
+    
_evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] 
=
+            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+                                                  
"file_cache_evict_by_size_normal_to_disposable");
+    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] 
=
+            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+                                                  
"file_cache_evict_by_size_normal_to_index");
+    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
+            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+                                                  
"file_cache_evict_by_size_normal_to_ttl");
+    
_evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
+            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+                                                  
"file_cache_evict_by_size_index_to_disposable");
+    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] 
=
+            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+                                                  
"file_cache_evict_by_size_index_to_normal");
+    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
+            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+                                                  
"file_cache_evict_by_size_index_to_ttl");
+    
_evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
+            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+                                                  
"file_cache_evict_by_size_ttl_to_disposable");
+    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
+            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+                                                  
"file_cache_evict_by_size_ttl_to_normal");
+    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
+            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
+                                                  
"file_cache_evict_by_size_ttl_to_index");
+
+    _evict_by_try_release = std::make_shared<bvar::Adder<size_t>>(
+            _cache_base_path.c_str(), "file_cache_evict_by_try_release");
+
     _num_read_blocks = 
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                              
"file_cache_num_read_blocks");
     _num_hit_blocks = 
std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
@@ -107,6 +195,8 @@ BlockFileCache::BlockFileCache(const std::string& 
cache_base_path,
                                                            
"file_cache_hit_ratio_5m", 0.0);
     _hit_ratio_1h = 
std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
                                                            
"file_cache_hit_ratio_1h", 0.0);
+    _disk_limit_mode_metrics =
+            std::make_shared<bvar::Status<size_t>>(_cache_base_path.c_str(), 
"disk_limit_mode", 0);
 
     _disposable_queue = LRUQueue(cache_settings.disposable_queue_size,
                                  cache_settings.disposable_queue_elements, 60 
* 60);
@@ -114,7 +204,7 @@ BlockFileCache::BlockFileCache(const std::string& 
cache_base_path,
                             7 * 24 * 60 * 60);
     _normal_queue = LRUQueue(cache_settings.query_queue_size, 
cache_settings.query_queue_elements,
                              24 * 60 * 60);
-    _ttl_queue = LRUQueue(std::numeric_limits<int>::max(), 
std::numeric_limits<int>::max(),
+    _ttl_queue = LRUQueue(cache_settings.ttl_queue_size, 
cache_settings.ttl_queue_elements,
                           std::numeric_limits<int>::max());
 
     if (cache_settings.storage == "memory") {
@@ -314,14 +404,10 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& 
hash, const CacheConte
             if (st.ok()) {
                 auto& queue = get_queue(origin_type);
                 queue.remove(cell.queue_iterator.value(), cache_lock);
-                if (config::enable_ttl_cache_evict_using_lru) {
-                    auto& ttl_queue = get_queue(FileCacheType::TTL);
-                    cell.queue_iterator = ttl_queue.add(
-                            cell.file_block->get_hash_value(), 
cell.file_block->offset(),
-                            cell.file_block->range().size(), cache_lock);
-                } else {
-                    cell.queue_iterator.reset();
-                }
+                auto& ttl_queue = get_queue(FileCacheType::TTL);
+                cell.queue_iterator =
+                        ttl_queue.add(cell.file_block->get_hash_value(), 
cell.file_block->offset(),
+                                      cell.file_block->range().size(), 
cache_lock);
             } else {
                 LOG_WARNING("Failed to change key meta").error(st);
             }
@@ -731,11 +817,10 @@ BlockFileCache::FileBlockCell* 
BlockFileCache::add_cell(const UInt128Wrapper& ha
                      << " cache_type=" << 
cache_type_to_string(context.cache_type)
                      << " error=" << st.msg();
     }
-    if (cell.file_block->cache_type() != FileCacheType::TTL ||
-        config::enable_ttl_cache_evict_using_lru) {
-        auto& queue = get_queue(cell.file_block->cache_type());
-        cell.queue_iterator = queue.add(hash, offset, size, cache_lock);
-    }
+
+    auto& queue = get_queue(cell.file_block->cache_type());
+    cell.queue_iterator = queue.add(hash, offset, size, cache_lock);
+
     if (cell.file_block->cache_type() == FileCacheType::TTL) {
         if (_key_to_time.find(hash) == _key_to_time.end()) {
             _key_to_time[hash] = context.expiration_time;
@@ -749,7 +834,7 @@ BlockFileCache::FileBlockCell* 
BlockFileCache::add_cell(const UInt128Wrapper& ha
 }
 
 size_t BlockFileCache::try_release() {
-    std::lock_guard l(_mutex);
+    std::lock_guard cache_lock(_mutex);
     std::vector<FileBlockCell*> trash;
     for (auto& [hash, blocks] : _files) {
         for (auto& [offset, cell] : blocks) {
@@ -758,11 +843,14 @@ size_t BlockFileCache::try_release() {
             }
         }
     }
+    size_t remove_size = 0;
     for (auto& cell : trash) {
         FileBlockSPtr file_block = cell->file_block;
         std::lock_guard lc(cell->file_block->_mutex);
-        remove(file_block, l, lc);
+        remove_size += file_block->range().size();
+        remove(file_block, cache_lock, lc);
     }
+    *_evict_by_try_release << remove_size;
     LOG(INFO) << "Released " << trash.size() << " blocks in file cache " << 
_cache_base_path;
     return trash.size();
 }
@@ -841,9 +929,10 @@ void 
BlockFileCache::remove_file_blocks_and_clean_time_maps(
 void BlockFileCache::find_evict_candidates(LRUQueue& queue, size_t size, 
size_t cur_cache_size,
                                            size_t& removed_size,
                                            std::vector<FileBlockCell*>& 
to_evict,
-                                           std::lock_guard<std::mutex>& 
cache_lock, bool is_ttl) {
+                                           std::lock_guard<std::mutex>& 
cache_lock,
+                                           size_t& cur_removed_size) {
     for (const auto& [entry_key, entry_offset, entry_size] : queue) {
-        if (!is_overflow(removed_size, size, cur_cache_size, is_ttl)) {
+        if (!is_overflow(removed_size, size, cur_cache_size)) {
             break;
         }
         auto* cell = get_cell(entry_key, entry_offset, cache_lock);
@@ -861,6 +950,7 @@ void BlockFileCache::find_evict_candidates(LRUQueue& queue, 
size_t size, size_t
             DCHECK(file_block->_download_state == 
FileBlock::State::DOWNLOADED);
             to_evict.push_back(cell);
             removed_size += cell_size;
+            cur_removed_size += cell_size;
         }
     }
 }
@@ -886,8 +976,9 @@ bool BlockFileCache::try_reserve_for_ttl_without_lru(size_t 
size,
     }
     std::vector<FileBlockCell*> to_evict;
     auto collect_eliminate_fragments = [&](LRUQueue& queue) {
+        size_t cur_removed_size = 0;
         find_evict_candidates(queue, size, cur_cache_size, removed_size, 
to_evict, cache_lock,
-                              false);
+                              cur_removed_size);
     };
     if (disposable_queue_size != 0) {
         collect_eliminate_fragments(get_queue(FileCacheType::DISPOSABLE));
@@ -914,8 +1005,9 @@ bool BlockFileCache::try_reserve_for_ttl(size_t size, 
std::lock_guard<std::mutex
         size_t cur_cache_size = _cur_cache_size;
 
         std::vector<FileBlockCell*> to_evict;
+        size_t cur_removed_size = 0;
         find_evict_candidates(queue, size, cur_cache_size, removed_size, 
to_evict, cache_lock,
-                              true);
+                              cur_removed_size);
         remove_file_blocks_and_clean_time_maps(to_evict, cache_lock);
 
         return !is_overflow(removed_size, size, cur_cache_size);
@@ -948,10 +1040,6 @@ bool BlockFileCache::try_reserve(const UInt128Wrapper& 
hash, const CacheContext&
         size = 5 * size;
     }
 
-    if (context.cache_type == FileCacheType::TTL) {
-        return try_reserve_for_ttl(size, cache_lock);
-    }
-
     auto query_context = config::enable_file_cache_query_limit &&
                                          (context.query_id.hi != 0 || 
context.query_id.lo != 0)
                                  ? get_query_context(context.query_id, 
cache_lock)
@@ -1112,12 +1200,33 @@ void BlockFileCache::remove_if_cached(const 
UInt128Wrapper& file_key) {
     }
 }
 
-std::vector<FileCacheType> BlockFileCache::get_other_cache_type(FileCacheType 
cur_cache_type) {
+std::vector<FileCacheType> BlockFileCache::get_other_cache_type_without_ttl(
+        FileCacheType cur_cache_type) {
     switch (cur_cache_type) {
+    case FileCacheType::TTL:
+        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, 
FileCacheType::INDEX};
     case FileCacheType::INDEX:
         return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL};
     case FileCacheType::NORMAL:
         return {FileCacheType::DISPOSABLE, FileCacheType::INDEX};
+    case FileCacheType::DISPOSABLE:
+        return {FileCacheType::NORMAL, FileCacheType::INDEX};
+    default:
+        return {};
+    }
+    return {};
+}
+
+std::vector<FileCacheType> BlockFileCache::get_other_cache_type(FileCacheType 
cur_cache_type) {
+    switch (cur_cache_type) {
+    case FileCacheType::TTL:
+        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, 
FileCacheType::INDEX};
+    case FileCacheType::INDEX:
+        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, 
FileCacheType::TTL};
+    case FileCacheType::NORMAL:
+        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX, 
FileCacheType::TTL};
+    case FileCacheType::DISPOSABLE:
+        return {FileCacheType::NORMAL, FileCacheType::INDEX, 
FileCacheType::TTL};
     default:
         return {};
     }
@@ -1143,13 +1252,14 @@ void BlockFileCache::reset_range(const UInt128Wrapper& 
hash, size_t offset, size
 }
 
 bool BlockFileCache::try_reserve_from_other_queue_by_hot_interval(
-        std::vector<FileCacheType> other_cache_types, size_t size, int64_t 
cur_time,
-        std::lock_guard<std::mutex>& cache_lock) {
+        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, 
size_t size,
+        int64_t cur_time, std::lock_guard<std::mutex>& cache_lock) {
     size_t removed_size = 0;
     size_t cur_cache_size = _cur_cache_size;
     std::vector<FileBlockCell*> to_evict;
     for (FileCacheType cache_type : other_cache_types) {
         auto& queue = get_queue(cache_type);
+        size_t remove_size_per_type = 0;
         for (const auto& [entry_key, entry_offset, entry_size] : queue) {
             if (!is_overflow(removed_size, size, cur_cache_size)) {
                 break;
@@ -1171,39 +1281,48 @@ bool 
BlockFileCache::try_reserve_from_other_queue_by_hot_interval(
                 DCHECK(file_block->_download_state == 
FileBlock::State::DOWNLOADED);
                 to_evict.push_back(cell);
                 removed_size += cell_size;
+                remove_size_per_type += cell_size;
             }
         }
+        *(_evict_by_heat_metrics_matrix[cache_type][cur_type]) << 
remove_size_per_type;
     }
     remove_file_blocks(to_evict, cache_lock);
 
     return !is_overflow(removed_size, size, cur_cache_size);
 }
 
-bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size, size_t 
cur_cache_size,
-                                 bool is_ttl) const {
+bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size,
+                                 size_t cur_cache_size) const {
     bool ret = false;
     if (_disk_resource_limit_mode) {
         ret = (removed_size < need_size);
     } else {
         ret = (cur_cache_size + need_size - removed_size > _capacity);
     }
-    if (is_ttl) {
-        size_t ttl_threshold = config::max_ttl_cache_ratio * _capacity / 100;
-        return (ret || ((cur_cache_size + need_size - removed_size) > 
ttl_threshold));
-    }
     return ret;
 }
 
 bool BlockFileCache::try_reserve_from_other_queue_by_size(
-        std::vector<FileCacheType> other_cache_types, size_t size,
+        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, 
size_t size,
         std::lock_guard<std::mutex>& cache_lock) {
     size_t removed_size = 0;
     size_t cur_cache_size = _cur_cache_size;
     std::vector<FileBlockCell*> to_evict;
+    // we follow the privilege defined in get_other_cache_types to evict
     for (FileCacheType cache_type : other_cache_types) {
         auto& queue = get_queue(cache_type);
+
+        // we will not drain each of them to the bottom -- i.e., we only
+        // evict what they have stolen.
+        size_t cur_queue_size = queue.get_capacity(cache_lock);
+        size_t cur_queue_max_size = queue.get_max_size();
+        if (cur_queue_size <= cur_queue_max_size) {
+            continue;
+        }
+        size_t cur_removed_size = 0;
         find_evict_candidates(queue, size, cur_cache_size, removed_size, 
to_evict, cache_lock,
-                              false);
+                              cur_removed_size);
+        *(_evict_by_size_metrics_matrix[cache_type][cur_type]) << 
cur_removed_size;
     }
     remove_file_blocks(to_evict, cache_lock);
     return !is_overflow(removed_size, size, cur_cache_size);
@@ -1212,16 +1331,15 @@ bool 
BlockFileCache::try_reserve_from_other_queue_by_size(
 bool BlockFileCache::try_reserve_from_other_queue(FileCacheType 
cur_cache_type, size_t size,
                                                   int64_t cur_time,
                                                   std::lock_guard<std::mutex>& 
cache_lock) {
-    // disposable queue cannot reserve other queues
-    if (cur_cache_type == FileCacheType::DISPOSABLE) {
-        return false;
-    }
-    auto other_cache_types = get_other_cache_type(cur_cache_type);
-    bool reserve_success = 
try_reserve_from_other_queue_by_hot_interval(other_cache_types, size,
-                                                                        
cur_time, cache_lock);
+    // currently, TTL cache is not considered as a candidate
+    auto other_cache_types = get_other_cache_type_without_ttl(cur_cache_type);
+    bool reserve_success = try_reserve_from_other_queue_by_hot_interval(
+            cur_cache_type, other_cache_types, size, cur_time, cache_lock);
     if (reserve_success || 
!config::file_cache_enable_evict_from_other_queue_by_size) {
         return reserve_success;
     }
+
+    other_cache_types = get_other_cache_type(cur_cache_type);
     auto& cur_queue = get_queue(cur_cache_type);
     size_t cur_queue_size = cur_queue.get_capacity(cache_lock);
     size_t cur_queue_max_size = cur_queue.get_max_size();
@@ -1229,7 +1347,8 @@ bool 
BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type,
     if (_cur_cache_size + size > _capacity && cur_queue_size + size > 
cur_queue_max_size) {
         return false;
     }
-    return try_reserve_from_other_queue_by_size(other_cache_types, size, 
cache_lock);
+    return try_reserve_from_other_queue_by_size(cur_cache_type, 
other_cache_types, size,
+                                                cache_lock);
 }
 
 bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,
@@ -1245,9 +1364,11 @@ bool BlockFileCache::try_reserve_for_lru(const 
UInt128Wrapper& hash,
         size_t cur_cache_size = _cur_cache_size;
 
         std::vector<FileBlockCell*> to_evict;
+        size_t cur_removed_size = 0;
         find_evict_candidates(queue, size, cur_cache_size, removed_size, 
to_evict, cache_lock,
-                              false);
+                              cur_removed_size);
         remove_file_blocks(to_evict, cache_lock);
+        *(_evict_by_self_lru_metrics_matrix[context.cache_type]) << 
cur_removed_size;
 
         if (is_overflow(removed_size, size, cur_cache_size)) {
             return false;
@@ -1522,6 +1643,7 @@ std::string BlockFileCache::reset_capacity(size_t 
new_capacity) {
                 ss << " ttl_queue released " << queue_released;
             }
             _disk_resource_limit_mode = true;
+            _disk_limit_mode_metrics->set_value(1);
             _async_clear_file_cache = true;
             ss << " total_space_released=" << space_released;
         }
@@ -1542,6 +1664,7 @@ void BlockFileCache::check_disk_resource_limit() {
     }
     if (_capacity > _cur_cache_size) {
         _disk_resource_limit_mode = false;
+        _disk_limit_mode_metrics->set_value(0);
     }
     std::pair<int, int> percent;
     int ret = disk_used_percentage(_cache_base_path, &percent);
@@ -1567,10 +1690,12 @@ void BlockFileCache::check_disk_resource_limit() {
     if (capacity_percentage >= 
config::file_cache_enter_disk_resource_limit_mode_percent ||
         inode_is_insufficient(inode_percentage)) {
         _disk_resource_limit_mode = true;
+        _disk_limit_mode_metrics->set_value(1);
     } else if (_disk_resource_limit_mode &&
                (capacity_percentage < 
config::file_cache_exit_disk_resource_limit_mode_percent) &&
                (inode_percentage < 
config::file_cache_exit_disk_resource_limit_mode_percent)) {
         _disk_resource_limit_mode = false;
+        _disk_limit_mode_metrics->set_value(0);
     }
     if (_disk_resource_limit_mode) {
         // log per mins
@@ -1685,14 +1810,9 @@ void BlockFileCache::modify_expiration_time(const 
UInt128Wrapper& hash,
             if (st.ok()) {
                 auto& queue = get_queue(origin_type);
                 queue.remove(cell.queue_iterator.value(), cache_lock);
-                if (config::enable_ttl_cache_evict_using_lru) {
-                    auto& ttl_queue = get_queue(FileCacheType::TTL);
-                    cell.queue_iterator =
-                            ttl_queue.add(hash, cell.file_block->offset(),
-                                          cell.file_block->range().size(), 
cache_lock);
-                } else {
-                    cell.queue_iterator.reset();
-                }
+                auto& ttl_queue = get_queue(FileCacheType::TTL);
+                cell.queue_iterator = ttl_queue.add(hash, 
cell.file_block->offset(),
+                                                    
cell.file_block->range().size(), cache_lock);
             }
             if (!st.ok()) {
                 LOG_WARNING("").error(st);
@@ -1850,6 +1970,12 @@ std::map<std::string, double> 
BlockFileCache::get_stats() {
     stats["index_queue_curr_elements"] =
             (double)_cur_index_queue_element_count_metrics->get_value();
 
+    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
+    stats["ttl_queue_curr_size"] = 
(double)_cur_ttl_cache_lru_queue_cache_size_metrics->get_value();
+    stats["ttl_queue_max_elements"] = 
(double)_ttl_queue.get_max_element_size();
+    stats["ttl_queue_curr_elements"] =
+            
(double)_cur_ttl_cache_lru_queue_element_count_metrics->get_value();
+
     stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
     stats["normal_queue_curr_size"] = 
(double)_cur_normal_queue_element_count_metrics->get_value();
     stats["normal_queue_max_elements"] = 
(double)_normal_queue.get_max_element_size();
@@ -1866,6 +1992,36 @@ std::map<std::string, double> 
BlockFileCache::get_stats() {
     return stats;
 }
 
+// for be UTs
+std::map<std::string, double> BlockFileCache::get_stats_unsafe() {
+    std::map<std::string, double> stats;
+    stats["hits_ratio"] = (double)_hit_ratio->get_value();
+    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
+    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
+
+    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
+    stats["index_queue_curr_size"] = 
(double)_index_queue.get_capacity_unsafe();
+    stats["index_queue_max_elements"] = 
(double)_index_queue.get_max_element_size();
+    stats["index_queue_curr_elements"] = 
(double)_index_queue.get_elements_num_unsafe();
+
+    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
+    stats["ttl_queue_curr_size"] = (double)_ttl_queue.get_capacity_unsafe();
+    stats["ttl_queue_max_elements"] = 
(double)_ttl_queue.get_max_element_size();
+    stats["ttl_queue_curr_elements"] = 
(double)_ttl_queue.get_elements_num_unsafe();
+
+    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
+    stats["normal_queue_curr_size"] = 
(double)_normal_queue.get_capacity_unsafe();
+    stats["normal_queue_max_elements"] = 
(double)_normal_queue.get_max_element_size();
+    stats["normal_queue_curr_elements"] = 
(double)_normal_queue.get_elements_num_unsafe();
+
+    stats["disposable_queue_max_size"] = 
(double)_disposable_queue.get_max_size();
+    stats["disposable_queue_curr_size"] = 
(double)_disposable_queue.get_capacity_unsafe();
+    stats["disposable_queue_max_elements"] = 
(double)_disposable_queue.get_max_element_size();
+    stats["disposable_queue_curr_elements"] = 
(double)_disposable_queue.get_elements_num_unsafe();
+
+    return stats;
+}
+
 template void BlockFileCache::remove(FileBlockSPtr file_block,
                                      std::lock_guard<std::mutex>& cache_lock,
                                      std::lock_guard<std::mutex>& block_lock);
diff --git a/be/src/io/cache/block_file_cache.h 
b/be/src/io/cache/block_file_cache.h
index ac30e2411fa..c0c66334a2b 100644
--- a/be/src/io/cache/block_file_cache.h
+++ b/be/src/io/cache/block_file_cache.h
@@ -145,6 +145,9 @@ public:
 
     std::map<std::string, double> get_stats();
 
+    // for be UTs
+    std::map<std::string, double> get_stats_unsafe();
+
     class LRUQueue {
     public:
         LRUQueue() = default;
@@ -179,6 +182,10 @@ public:
             return cache_size;
         }
 
+        size_t get_capacity_unsafe() const { return cache_size; }
+
+        size_t get_elements_num_unsafe() const { return queue.size(); }
+
         size_t get_elements_num(std::lock_guard<std::mutex>& /* cache_lock */) 
const {
             return queue.size();
         }
@@ -345,6 +352,7 @@ private:
     bool try_reserve_during_async_load(size_t size, 
std::lock_guard<std::mutex>& cache_lock);
 
     std::vector<FileCacheType> get_other_cache_type(FileCacheType 
cur_cache_type);
+    std::vector<FileCacheType> get_other_cache_type_without_ttl(FileCacheType 
cur_cache_type);
 
     bool try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t 
offset, int64_t cur_time,
                                       std::lock_guard<std::mutex>& cache_lock);
@@ -390,15 +398,16 @@ private:
 
     void recycle_deleted_blocks();
 
-    bool 
try_reserve_from_other_queue_by_hot_interval(std::vector<FileCacheType> 
other_cache_types,
+    bool try_reserve_from_other_queue_by_hot_interval(FileCacheType cur_type,
+                                                      
std::vector<FileCacheType> other_cache_types,
                                                       size_t size, int64_t 
cur_time,
                                                       
std::lock_guard<std::mutex>& cache_lock);
 
-    bool try_reserve_from_other_queue_by_size(std::vector<FileCacheType> 
other_cache_types,
+    bool try_reserve_from_other_queue_by_size(FileCacheType cur_type,
+                                              std::vector<FileCacheType> 
other_cache_types,
                                               size_t size, 
std::lock_guard<std::mutex>& cache_lock);
 
-    bool is_overflow(size_t removed_size, size_t need_size, size_t 
cur_cache_size,
-                     bool is_ttl = false) const;
+    bool is_overflow(size_t removed_size, size_t need_size, size_t 
cur_cache_size) const;
 
     void remove_file_blocks(std::vector<FileBlockCell*>&, 
std::lock_guard<std::mutex>&);
 
@@ -407,7 +416,10 @@ private:
 
     void find_evict_candidates(LRUQueue& queue, size_t size, size_t 
cur_cache_size,
                                size_t& removed_size, 
std::vector<FileBlockCell*>& to_evict,
-                               std::lock_guard<std::mutex>& cache_lock, bool 
is_ttl);
+                               std::lock_guard<std::mutex>& cache_lock, 
size_t& cur_removed_size);
+
+    void recycle_stale_rowset_async_bottom_half();
+
     // info
     std::string _cache_base_path;
     size_t _capacity = 0;
@@ -459,6 +471,10 @@ private:
     std::shared_ptr<bvar::Status<size_t>> 
_cur_disposable_queue_cache_size_metrics;
     std::array<std::shared_ptr<bvar::Adder<size_t>>, 4> 
_queue_evict_size_metrics;
     std::shared_ptr<bvar::Adder<size_t>> _total_evict_size_metrics;
+    std::shared_ptr<bvar::Adder<size_t>> _evict_by_heat_metrics_matrix[4][4];
+    std::shared_ptr<bvar::Adder<size_t>> _evict_by_size_metrics_matrix[4][4];
+    std::shared_ptr<bvar::Adder<size_t>> _evict_by_self_lru_metrics_matrix[4];
+    std::shared_ptr<bvar::Adder<size_t>> _evict_by_try_release;
 
     std::shared_ptr<bvar::Window<bvar::Adder<size_t>>> _num_hit_blocks_5m;
     std::shared_ptr<bvar::Window<bvar::Adder<size_t>>> _num_read_blocks_5m;
@@ -472,6 +488,7 @@ private:
     std::shared_ptr<bvar::Status<double>> _hit_ratio;
     std::shared_ptr<bvar::Status<double>> _hit_ratio_5m;
     std::shared_ptr<bvar::Status<double>> _hit_ratio_1h;
+    std::shared_ptr<bvar::Status<size_t>> _disk_limit_mode_metrics;
 };
 
 } // namespace doris::io
diff --git a/be/src/io/cache/file_cache_common.cpp 
b/be/src/io/cache/file_cache_common.cpp
index c569ace0011..67487930045 100644
--- a/be/src/io/cache/file_cache_common.cpp
+++ b/be/src/io/cache/file_cache_common.cpp
@@ -34,6 +34,7 @@ std::string FileCacheSettings::to_string() const {
        << ", disposable_queue_elements: " << disposable_queue_elements
        << ", index_queue_size: " << index_queue_size
        << ", index_queue_elements: " << index_queue_elements
+       << ", ttl_queue_size: " << ttl_queue_size << ", ttl_queue_elements: " 
<< ttl_queue_elements
        << ", query_queue_size: " << query_queue_size
        << ", query_queue_elements: " << query_queue_elements << ", storage: " 
<< storage;
     return ss.str();
@@ -58,6 +59,10 @@ FileCacheSettings get_file_cache_settings(size_t capacity, 
size_t max_query_cach
             std::max(settings.index_queue_size / settings.max_file_block_size,
                      REMOTE_FS_OBJECTS_CACHE_DEFAULT_ELEMENTS);
 
+    settings.ttl_queue_size = per_size * config::max_ttl_cache_ratio;
+    settings.ttl_queue_elements = std::max(settings.ttl_queue_size / 
settings.max_file_block_size,
+                                           
REMOTE_FS_OBJECTS_CACHE_DEFAULT_ELEMENTS);
+
     settings.query_queue_size =
             settings.capacity - settings.disposable_queue_size - 
settings.index_queue_size;
     settings.query_queue_elements =
diff --git a/be/src/io/cache/file_cache_common.h 
b/be/src/io/cache/file_cache_common.h
index 21309831a82..30579ba7851 100644
--- a/be/src/io/cache/file_cache_common.h
+++ b/be/src/io/cache/file_cache_common.h
@@ -26,17 +26,17 @@ namespace doris::io {
 
 inline static constexpr size_t REMOTE_FS_OBJECTS_CACHE_DEFAULT_ELEMENTS = 100 
* 1024;
 inline static constexpr size_t FILE_CACHE_MAX_FILE_BLOCK_SIZE = 1 * 1024 * 
1024;
-inline static constexpr size_t DEFAULT_NORMAL_PERCENT = 85;
-inline static constexpr size_t DEFAULT_DISPOSABLE_PERCENT = 10;
+inline static constexpr size_t DEFAULT_NORMAL_PERCENT = 40;
+inline static constexpr size_t DEFAULT_DISPOSABLE_PERCENT = 5;
 inline static constexpr size_t DEFAULT_INDEX_PERCENT = 5;
 
 using uint128_t = vectorized::UInt128;
 
-enum class FileCacheType {
-    INDEX,
-    NORMAL,
-    DISPOSABLE,
-    TTL,
+enum FileCacheType {
+    INDEX = 2,
+    NORMAL = 1,
+    DISPOSABLE = 0,
+    TTL = 3,
 };
 
 struct UInt128Wrapper {
@@ -93,6 +93,8 @@ struct FileCacheSettings {
     size_t index_queue_elements {0};
     size_t query_queue_size {0};
     size_t query_queue_elements {0};
+    size_t ttl_queue_size {0};
+    size_t ttl_queue_elements {0};
     size_t max_file_block_size {0};
     size_t max_query_cache_size {0};
     std::string storage;
diff --git a/be/test/io/cache/block_file_cache_test.cpp 
b/be/test/io/cache/block_file_cache_test.cpp
index f77dc439e95..11e99a48052 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -81,7 +81,7 @@ constexpr unsigned long long operator"" _kb(unsigned long 
long m) {
 void assert_range([[maybe_unused]] size_t assert_n, io::FileBlockSPtr 
file_block,
                   const io::FileBlock::Range& expected_range, 
io::FileBlock::State expected_state) {
     auto range = file_block->range();
-
+    std::cout << "assert_range num: " << assert_n << std::endl;
     ASSERT_EQ(range.left, expected_range.left);
     ASSERT_EQ(range.right, expected_range.right);
     ASSERT_EQ(file_block->state(), expected_state);
@@ -139,7 +139,6 @@ class BlockFileCacheTest : public testing::Test {
 public:
     static void SetUpTestSuite() {
         config::file_cache_enter_disk_resource_limit_mode_percent = 99;
-        config::enable_ttl_cache_evict_using_lru = false;
         bool exists {false};
         ASSERT_TRUE(global_local_filesystem()->exists(caches_dir, 
&exists).ok());
         if (!exists) {
@@ -1110,8 +1109,10 @@ TEST_F(BlockFileCacheTest, max_ttl_size) {
     query_id.hi = 1;
     query_id.lo = 1;
     io::FileCacheSettings settings;
-    settings.query_queue_size = 100000000;
-    settings.query_queue_elements = 100000;
+    settings.query_queue_size = 50000000;
+    settings.query_queue_elements = 50000;
+    settings.ttl_queue_size = 50000000;
+    settings.ttl_queue_elements = 50000;
     settings.capacity = 100000000;
     settings.max_file_block_size = 100000;
     settings.max_query_cache_size = 30;
@@ -1136,7 +1137,7 @@ TEST_F(BlockFileCacheTest, max_ttl_size) {
         auto holder = cache.get_or_set(key1, offset, 100000, context);
         auto blocks = fromHolder(holder);
         ASSERT_EQ(blocks.size(), 1);
-        if (offset < 90000000) {
+        if (offset < 50000000) {
             assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
                          io::FileBlock::State::EMPTY);
             ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
@@ -1145,7 +1146,79 @@ TEST_F(BlockFileCacheTest, max_ttl_size) {
                          io::FileBlock::State::DOWNLOADED);
         } else {
             assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
-                         io::FileBlock::State::SKIP_CACHE);
+                         io::FileBlock::State::EMPTY);
+        }
+        blocks.clear();
+    }
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+}
+
+TEST_F(BlockFileCacheTest, max_ttl_size_with_other_cache_exist) {
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    fs::create_directories(cache_base_path);
+    TUniqueId query_id;
+    query_id.hi = 1;
+    query_id.lo = 1;
+    io::FileCacheSettings settings;
+    settings.query_queue_size = 50000000;
+    settings.query_queue_elements = 50000;
+    settings.ttl_queue_size = 50000000;
+    settings.ttl_queue_elements = 50000;
+    settings.capacity = 100000000;
+    settings.max_file_block_size = 100000;
+    settings.max_query_cache_size = 30;
+
+    auto key1 = io::BlockFileCache::hash("key5");
+    io::BlockFileCache cache(cache_base_path, settings);
+    ASSERT_TRUE(cache.initialize());
+
+    int i = 0;
+    for (; i < 100; i++) {
+        if (cache.get_async_open_success()) {
+            break;
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    }
+    ASSERT_TRUE(cache.get_async_open_success());
+
+    // populate the cache with other cache type
+    io::CacheContext context;
+    context.cache_type = io::FileCacheType::NORMAL;
+    context.query_id = query_id;
+    int64_t offset = 100000000;
+    for (; offset < 180000000; offset += 100000) {
+        auto holder = cache.get_or_set(key1, offset, 100000, context);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+        assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        blocks.clear();
+    }
+
+    // then get started with TTL
+    context.cache_type = io::FileCacheType::TTL;
+    context.query_id = query_id;
+    int64_t cur_time = UnixSeconds();
+    context.expiration_time = cur_time + 120;
+    offset = 0;
+    for (; offset < 100000000; offset += 100000) {
+        auto holder = cache.get_or_set(key1, offset, 100000, context);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+        if (offset < 50000000) {
+            assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                         io::FileBlock::State::EMPTY);
+            ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+            download(blocks[0]);
+            assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                         io::FileBlock::State::DOWNLOADED);
+        } else {
+            assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                         io::FileBlock::State::EMPTY);
         }
         blocks.clear();
     }
@@ -1195,7 +1268,7 @@ TEST_F(BlockFileCacheTest, max_ttl_size_memory_storage) {
                          io::FileBlock::State::DOWNLOADED);
         } else {
             assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
-                         io::FileBlock::State::SKIP_CACHE);
+                         io::FileBlock::State::EMPTY);
         }
         blocks.clear();
     }
@@ -2065,7 +2138,9 @@ TEST_F(BlockFileCacheTest, ttl_normal) {
     io::FileCacheSettings settings;
     settings.query_queue_size = 50;
     settings.query_queue_elements = 5;
-    settings.capacity = 50;
+    settings.ttl_queue_size = 50;
+    settings.ttl_queue_elements = 5;
+    settings.capacity = 100;
     settings.max_file_block_size = 30;
     settings.max_query_cache_size = 30;
     io::CacheContext context;
@@ -2160,7 +2235,9 @@ TEST_F(BlockFileCacheTest, ttl_modify) {
     io::FileCacheSettings settings;
     settings.query_queue_size = 30;
     settings.query_queue_elements = 5;
-    settings.capacity = 30;
+    settings.ttl_queue_size = 30;
+    settings.ttl_queue_elements = 5;
+    settings.capacity = 60;
     settings.max_file_block_size = 30;
     settings.max_query_cache_size = 30;
     io::CacheContext context;
@@ -2314,7 +2391,9 @@ TEST_F(BlockFileCacheTest, ttl_change_to_normal) {
     io::FileCacheSettings settings;
     settings.query_queue_size = 30;
     settings.query_queue_elements = 5;
-    settings.capacity = 30;
+    settings.ttl_queue_size = 30;
+    settings.ttl_queue_elements = 5;
+    settings.capacity = 60;
     settings.max_file_block_size = 30;
     settings.max_query_cache_size = 30;
     io::CacheContext context;
@@ -2428,7 +2507,9 @@ TEST_F(BlockFileCacheTest, ttl_change_expiration_time) {
     io::FileCacheSettings settings;
     settings.query_queue_size = 30;
     settings.query_queue_elements = 5;
-    settings.capacity = 30;
+    settings.ttl_queue_size = 30;
+    settings.ttl_queue_elements = 5;
+    settings.capacity = 60;
     settings.max_file_block_size = 30;
     settings.max_query_cache_size = 30;
     io::CacheContext context;
@@ -2450,6 +2531,16 @@ TEST_F(BlockFileCacheTest, ttl_change_expiration_time) {
         auto holder = cache.get_or_set(key2, 50, 10, context); /// Add range 
[50, 59]
         auto blocks = fromHolder(holder);
         ASSERT_EQ(blocks.size(), 1);
+        // std::cout << "current cache size:"  << cache.get_used_cache_size() 
<< std::endl;
+        std::cout << "cache capacity:" << cache.capacity() << std::endl;
+        auto map = cache.get_stats_unsafe();
+        for (auto& [key, value] : map) {
+            std::cout << key << " : " << value << std::endl;
+        }
+        auto key1 = io::BlockFileCache::hash("key1");
+        std::cout << cache.dump_structure(key1) << std::endl;
+        std::cout << cache.dump_structure(key2) << std::endl;
+
         assert_range(1, blocks[0], io::FileBlock::Range(50, 59), 
io::FileBlock::State::EMPTY);
         ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
         download(blocks[0]);
@@ -2532,105 +2623,6 @@ TEST_F(BlockFileCacheTest, 
ttl_change_expiration_time_memory_storage) {
     }
 }
 
-TEST_F(BlockFileCacheTest, ttl_reverse) {
-    if (fs::exists(cache_base_path)) {
-        fs::remove_all(cache_base_path);
-    }
-    fs::create_directories(cache_base_path);
-    test_file_cache(io::FileCacheType::NORMAL);
-    TUniqueId query_id;
-    query_id.hi = 1;
-    query_id.lo = 1;
-    io::FileCacheSettings settings;
-    settings.query_queue_size = 36;
-    settings.query_queue_elements = 5;
-    settings.capacity = 36;
-    settings.max_file_block_size = 7;
-    settings.max_query_cache_size = 30;
-    io::CacheContext context;
-    context.cache_type = io::FileCacheType::TTL;
-    context.query_id = query_id;
-    int64_t cur_time = UnixSeconds();
-    context.expiration_time = cur_time + 180;
-    auto key2 = io::BlockFileCache::hash("key2");
-    io::BlockFileCache cache(cache_base_path, settings);
-    ASSERT_TRUE(cache.initialize());
-    for (int i = 0; i < 100; i++) {
-        if (cache.get_async_open_success()) {
-            break;
-        };
-        std::this_thread::sleep_for(std::chrono::milliseconds(1));
-    }
-    ASSERT_TRUE(cache.get_async_open_success());
-    for (size_t offset = 0; offset < 30; offset += 6) {
-        auto holder = cache.get_or_set(key2, offset, 6, context);
-        auto blocks = fromHolder(holder);
-        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
-        download(blocks[0]);
-    }
-    {
-        auto holder = cache.get_or_set(key2, 50, 7, context); /// Add range 
[50, 57]
-        auto blocks = fromHolder(holder);
-        assert_range(1, blocks[0], io::FileBlock::Range(50, 56), 
io::FileBlock::State::SKIP_CACHE);
-    }
-    {
-        context.cache_type = io::FileCacheType::NORMAL;
-        auto holder = cache.get_or_set(key2, 50, 7, context); /// Add range 
[50, 57]
-        auto blocks = fromHolder(holder);
-        assert_range(1, blocks[0], io::FileBlock::Range(50, 56), 
io::FileBlock::State::SKIP_CACHE);
-    }
-
-    if (fs::exists(cache_base_path)) {
-        fs::remove_all(cache_base_path);
-    }
-}
-
-TEST_F(BlockFileCacheTest, ttl_reverse_memory_storage) {
-    test_file_cache_memory_storage(io::FileCacheType::NORMAL);
-    TUniqueId query_id;
-    query_id.hi = 1;
-    query_id.lo = 1;
-    io::FileCacheSettings settings;
-    settings.query_queue_size = 36;
-    settings.query_queue_elements = 5;
-    settings.capacity = 36;
-    settings.max_file_block_size = 7;
-    settings.max_query_cache_size = 30;
-    settings.storage = "memory";
-    io::CacheContext context;
-    context.cache_type = io::FileCacheType::TTL;
-    context.query_id = query_id;
-    int64_t cur_time = UnixSeconds();
-    context.expiration_time = cur_time + 180;
-    auto key2 = io::BlockFileCache::hash("key2");
-    io::BlockFileCache cache(cache_base_path, settings);
-    ASSERT_TRUE(cache.initialize());
-    for (int i = 0; i < 100; i++) {
-        if (cache.get_async_open_success()) {
-            break;
-        };
-        std::this_thread::sleep_for(std::chrono::milliseconds(1));
-    }
-    ASSERT_TRUE(cache.get_async_open_success());
-    for (size_t offset = 0; offset < 30; offset += 6) {
-        auto holder = cache.get_or_set(key2, offset, 6, context);
-        auto blocks = fromHolder(holder);
-        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
-        download_into_memory(blocks[0]);
-    }
-    {
-        auto holder = cache.get_or_set(key2, 50, 7, context); /// Add range 
[50, 57]
-        auto blocks = fromHolder(holder);
-        assert_range(1, blocks[0], io::FileBlock::Range(50, 56), 
io::FileBlock::State::SKIP_CACHE);
-    }
-    {
-        context.cache_type = io::FileCacheType::NORMAL;
-        auto holder = cache.get_or_set(key2, 50, 7, context); /// Add range 
[50, 57]
-        auto blocks = fromHolder(holder);
-        assert_range(1, blocks[0], io::FileBlock::Range(50, 56), 
io::FileBlock::State::SKIP_CACHE);
-    }
-}
-
 TEST_F(BlockFileCacheTest, io_error) {
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
@@ -2906,7 +2898,8 @@ TEST_F(BlockFileCacheTest, recyle_cache_async) {
     cache.clear_file_cache_async();
     while (cache._async_clear_file_cache)
         ;
-    EXPECT_EQ(cache._cur_cache_size, 5);
+    EXPECT_EQ(cache._cur_cache_size, 20); // 0-4 is used again, so all the 
cache data in DISPOSABLE
+                                          // remain unremoved
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
     }
@@ -4838,10 +4831,7 @@ TEST_F(BlockFileCacheTest, recyle_unvalid_ttl_async) {
     }
 }
 
-TEST_F(BlockFileCacheTest, ttl_reserve_wo_evict_using_lru) {
-    config::file_cache_ttl_valid_check_interval_second = 4;
-    config::enable_ttl_cache_evict_using_lru = false;
-
+TEST_F(BlockFileCacheTest, reset_capacity) {
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
     }
@@ -4854,18 +4844,26 @@ TEST_F(BlockFileCacheTest, 
ttl_reserve_wo_evict_using_lru) {
     settings.query_queue_elements = 5;
     settings.index_queue_size = 30;
     settings.index_queue_elements = 5;
-    settings.disposable_queue_size = 0;
-    settings.disposable_queue_elements = 0;
-    settings.capacity = 60;
+    settings.disposable_queue_size = 30;
+    settings.disposable_queue_elements = 5;
+    settings.capacity = 90;
     settings.max_file_block_size = 30;
     settings.max_query_cache_size = 30;
     io::CacheContext context;
     context.query_id = query_id;
     auto key = io::BlockFileCache::hash("key1");
+    auto key2 = io::BlockFileCache::hash("key2");
     io::BlockFileCache cache(cache_base_path, settings);
-    context.cache_type = io::FileCacheType::TTL;
-    context.expiration_time = UnixSeconds() + 3600;
-
+    auto sp = SyncPoint::get_instance();
+    Defer defer {[sp] {
+        sp->clear_call_back("BlockFileCache::set_remove_batch");
+        sp->clear_call_back("BlockFileCache::set_sleep_time");
+    }};
+    sp->set_call_back("BlockFileCache::set_sleep_time",
+                      [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 1; 
});
+    sp->set_call_back("BlockFileCache::set_remove_batch",
+                      [](auto&& args) { *try_any_cast<int*>(args[0]) = 2; });
+    sp->enable_processing();
     ASSERT_TRUE(cache.initialize());
     for (int i = 0; i < 100; i++) {
         if (cache.get_async_open_success()) {
@@ -4873,7 +4871,8 @@ TEST_F(BlockFileCacheTest, 
ttl_reserve_wo_evict_using_lru) {
         };
         std::this_thread::sleep_for(std::chrono::milliseconds(1));
     }
-    for (int64_t offset = 0; offset < (60 * config::max_ttl_cache_ratio / 100 
- 5); offset += 5) {
+    for (int64_t offset = 0; offset < 45; offset += 5) {
+        context.cache_type = static_cast<io::FileCacheType>((offset / 5) % 3);
         auto holder = cache.get_or_set(key, offset, 5, context);
         auto segments = fromHolder(holder);
         ASSERT_EQ(segments.size(), 1);
@@ -4885,50 +4884,55 @@ TEST_F(BlockFileCacheTest, 
ttl_reserve_wo_evict_using_lru) {
                      io::FileBlock::State::DOWNLOADED);
     }
     context.cache_type = io::FileCacheType::TTL;
-    context.expiration_time = UnixSeconds() + 3600;
-    for (int64_t offset = 60; offset < 70; offset += 5) {
-        auto holder = cache.get_or_set(key, offset, 5, context);
+    int64_t cur_time = UnixSeconds();
+    context.expiration_time = cur_time + 120;
+    for (int64_t offset = 45; offset < 90; offset += 5) {
+        auto holder = cache.get_or_set(key2, offset, 5, context);
         auto segments = fromHolder(holder);
         ASSERT_EQ(segments.size(), 1);
         assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
-                     io::FileBlock::State::SKIP_CACHE);
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(segments[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(segments[0]);
+        assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
+                     io::FileBlock::State::DOWNLOADED);
     }
-
-    EXPECT_EQ(cache._cur_cache_size, 50);
-    EXPECT_EQ(cache._ttl_queue.cache_size, 0);
+    std::cout << cache.reset_capacity(30) << std::endl;
+    while (cache._async_clear_file_cache)
+        ;
+    EXPECT_EQ(cache._cur_cache_size, 30);
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
     }
 }
 
-TEST_F(BlockFileCacheTest, ttl_reserve_with_evict_using_lru) {
-    config::file_cache_ttl_valid_check_interval_second = 4;
-    config::enable_ttl_cache_evict_using_lru = true;
-
+TEST_F(BlockFileCacheTest, change_cache_type1) {
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
     }
     fs::create_directories(cache_base_path);
+    auto sp = SyncPoint::get_instance();
+    sp->set_call_back("FileBlock::change_cache_type", [](auto&& args) {
+        *try_any_cast<Status*>(args[0]) = Status::IOError("inject io error");
+    });
+    sp->enable_processing();
     TUniqueId query_id;
     query_id.hi = 1;
     query_id.lo = 1;
     io::FileCacheSettings settings;
     settings.query_queue_size = 30;
     settings.query_queue_elements = 5;
-    settings.index_queue_size = 30;
-    settings.index_queue_elements = 5;
-    settings.disposable_queue_size = 0;
-    settings.disposable_queue_elements = 0;
-    settings.capacity = 60;
+    settings.capacity = 30;
     settings.max_file_block_size = 30;
     settings.max_query_cache_size = 30;
     io::CacheContext context;
+    context.cache_type = io::FileCacheType::TTL;
     context.query_id = query_id;
-    auto key = io::BlockFileCache::hash("key1");
+    int64_t cur_time = UnixSeconds();
+    context.expiration_time = cur_time + 120;
+    int64_t modify_time = cur_time + 5;
+    auto key1 = io::BlockFileCache::hash("key1");
     io::BlockFileCache cache(cache_base_path, settings);
-    context.cache_type = io::FileCacheType::TTL;
-    context.expiration_time = UnixSeconds() + 3600;
-
     ASSERT_TRUE(cache.initialize());
     for (int i = 0; i < 100; i++) {
         if (cache.get_async_open_success()) {
@@ -4936,241 +4940,28 @@ TEST_F(BlockFileCacheTest, 
ttl_reserve_with_evict_using_lru) {
         };
         std::this_thread::sleep_for(std::chrono::milliseconds(1));
     }
-    for (int64_t offset = 0; offset < (60 * config::max_ttl_cache_ratio / 
100); offset += 5) {
-        auto holder = cache.get_or_set(key, offset, 5, context);
+    {
+        auto holder = cache.get_or_set(key1, 50, 10, context); /// Add range 
[50, 59]
         auto segments = fromHolder(holder);
         ASSERT_EQ(segments.size(), 1);
-        assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
-                     io::FileBlock::State::EMPTY);
+        assert_range(1, segments[0], io::FileBlock::Range(50, 59), 
io::FileBlock::State::EMPTY);
         ASSERT_TRUE(segments[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
         download(segments[0]);
-        assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
+        assert_range(1, segments[0], io::FileBlock::Range(50, 59),
                      io::FileBlock::State::DOWNLOADED);
+        EXPECT_EQ(segments[0]->cache_type(), io::FileCacheType::TTL);
+        EXPECT_EQ(segments[0]->expiration_time(), context.expiration_time);
     }
-    context.cache_type = io::FileCacheType::TTL;
-    context.expiration_time = UnixSeconds() + 3600;
-    for (int64_t offset = 60; offset < 70; offset += 5) {
-        auto holder = cache.get_or_set(key, offset, 5, context);
+    context.cache_type = io::FileCacheType::NORMAL;
+    context.expiration_time = 0;
+    {
+        auto holder = cache.get_or_set(key1, 50, 10, context); /// Add range 
[50, 59]
         auto segments = fromHolder(holder);
         ASSERT_EQ(segments.size(), 1);
-        assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
-                     io::FileBlock::State::EMPTY);
-        ASSERT_TRUE(segments[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
-        download(segments[0]);
-        assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
+        assert_range(1, segments[0], io::FileBlock::Range(50, 59),
                      io::FileBlock::State::DOWNLOADED);
-    }
-
-    EXPECT_EQ(cache._cur_cache_size, 50);
-    EXPECT_EQ(cache._ttl_queue.cache_size, 50);
-    if (fs::exists(cache_base_path)) {
-        fs::remove_all(cache_base_path);
-    }
-}
-
-TEST_F(BlockFileCacheTest, 
ttl_reserve_with_evict_using_lru_meet_max_ttl_cache_ratio_limit) {
-    config::file_cache_ttl_valid_check_interval_second = 4;
-    config::enable_ttl_cache_evict_using_lru = true;
-    int old = config::max_ttl_cache_ratio;
-    config::max_ttl_cache_ratio = 50;
-
-    if (fs::exists(cache_base_path)) {
-        fs::remove_all(cache_base_path);
-    }
-    fs::create_directories(cache_base_path);
-    TUniqueId query_id;
-    query_id.hi = 1;
-    query_id.lo = 1;
-    io::FileCacheSettings settings;
-    settings.query_queue_size = 30;
-    settings.query_queue_elements = 5;
-    settings.index_queue_size = 30;
-    settings.index_queue_elements = 5;
-    settings.disposable_queue_size = 0;
-    settings.disposable_queue_elements = 0;
-    settings.capacity = 60;
-    settings.max_file_block_size = 30;
-    settings.max_query_cache_size = 30;
-    io::CacheContext context;
-    context.query_id = query_id;
-    auto key = io::BlockFileCache::hash("key1");
-    io::BlockFileCache cache(cache_base_path, settings);
-    context.cache_type = io::FileCacheType::TTL;
-    context.expiration_time = UnixSeconds() + 3600;
-
-    ASSERT_TRUE(cache.initialize());
-    for (int i = 0; i < 100; i++) {
-        if (cache.get_async_open_success()) {
-            break;
-        };
-        std::this_thread::sleep_for(std::chrono::milliseconds(1));
-    }
-    for (int64_t offset = 0; offset < (60 * config::max_ttl_cache_ratio / 
100); offset += 5) {
-        auto holder = cache.get_or_set(key, offset, 5, context);
-        auto segments = fromHolder(holder);
-        ASSERT_EQ(segments.size(), 1);
-        assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
-                     io::FileBlock::State::EMPTY);
-        ASSERT_TRUE(segments[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
-        download(segments[0]);
-        assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
-                     io::FileBlock::State::DOWNLOADED);
-    }
-    EXPECT_EQ(cache._cur_cache_size, 30);
-    EXPECT_EQ(cache._ttl_queue.cache_size, 30);
-    context.cache_type = io::FileCacheType::TTL;
-    context.expiration_time = UnixSeconds() + 3600;
-    for (int64_t offset = 60; offset < 70; offset += 5) {
-        auto holder = cache.get_or_set(key, offset, 5, context);
-        auto segments = fromHolder(holder);
-        ASSERT_EQ(segments.size(), 1);
-        assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
-                     io::FileBlock::State::EMPTY);
-        ASSERT_TRUE(segments[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
-        download(segments[0]);
-        assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
-                     io::FileBlock::State::DOWNLOADED);
-    }
-
-    EXPECT_EQ(cache._cur_cache_size, 30);
-    EXPECT_EQ(cache._ttl_queue.cache_size, 30);
-    if (fs::exists(cache_base_path)) {
-        fs::remove_all(cache_base_path);
-    }
-    config::max_ttl_cache_ratio = old;
-}
-
-TEST_F(BlockFileCacheTest, reset_capacity) {
-    if (fs::exists(cache_base_path)) {
-        fs::remove_all(cache_base_path);
-    }
-    fs::create_directories(cache_base_path);
-    TUniqueId query_id;
-    query_id.hi = 1;
-    query_id.lo = 1;
-    io::FileCacheSettings settings;
-    settings.query_queue_size = 30;
-    settings.query_queue_elements = 5;
-    settings.index_queue_size = 30;
-    settings.index_queue_elements = 5;
-    settings.disposable_queue_size = 30;
-    settings.disposable_queue_elements = 5;
-    settings.capacity = 90;
-    settings.max_file_block_size = 30;
-    settings.max_query_cache_size = 30;
-    io::CacheContext context;
-    context.query_id = query_id;
-    auto key = io::BlockFileCache::hash("key1");
-    auto key2 = io::BlockFileCache::hash("key2");
-    io::BlockFileCache cache(cache_base_path, settings);
-    auto sp = SyncPoint::get_instance();
-    Defer defer {[sp] {
-        sp->clear_call_back("BlockFileCache::set_remove_batch");
-        sp->clear_call_back("BlockFileCache::set_sleep_time");
-    }};
-    sp->set_call_back("BlockFileCache::set_sleep_time",
-                      [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 1; 
});
-    sp->set_call_back("BlockFileCache::set_remove_batch",
-                      [](auto&& args) { *try_any_cast<int*>(args[0]) = 2; });
-    sp->enable_processing();
-    ASSERT_TRUE(cache.initialize());
-    for (int i = 0; i < 100; i++) {
-        if (cache.get_async_open_success()) {
-            break;
-        };
-        std::this_thread::sleep_for(std::chrono::milliseconds(1));
-    }
-    for (int64_t offset = 0; offset < 45; offset += 5) {
-        context.cache_type = static_cast<io::FileCacheType>((offset / 5) % 3);
-        auto holder = cache.get_or_set(key, offset, 5, context);
-        auto segments = fromHolder(holder);
-        ASSERT_EQ(segments.size(), 1);
-        assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
-                     io::FileBlock::State::EMPTY);
-        ASSERT_TRUE(segments[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
-        download(segments[0]);
-        assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
-                     io::FileBlock::State::DOWNLOADED);
-    }
-    context.cache_type = io::FileCacheType::TTL;
-    int64_t cur_time = UnixSeconds();
-    context.expiration_time = cur_time + 120;
-    for (int64_t offset = 45; offset < 90; offset += 5) {
-        auto holder = cache.get_or_set(key2, offset, 5, context);
-        auto segments = fromHolder(holder);
-        ASSERT_EQ(segments.size(), 1);
-        assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
-                     io::FileBlock::State::EMPTY);
-        ASSERT_TRUE(segments[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
-        download(segments[0]);
-        assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
-                     io::FileBlock::State::DOWNLOADED);
-    }
-    std::cout << cache.reset_capacity(30) << std::endl;
-    while (cache._async_clear_file_cache)
-        ;
-    EXPECT_EQ(cache._cur_cache_size, 30);
-    if (fs::exists(cache_base_path)) {
-        fs::remove_all(cache_base_path);
-    }
-}
-
-TEST_F(BlockFileCacheTest, change_cache_type1) {
-    if (fs::exists(cache_base_path)) {
-        fs::remove_all(cache_base_path);
-    }
-    fs::create_directories(cache_base_path);
-    auto sp = SyncPoint::get_instance();
-    sp->set_call_back("FileBlock::change_cache_type", [](auto&& args) {
-        *try_any_cast<Status*>(args[0]) = Status::IOError("inject io error");
-    });
-    sp->enable_processing();
-    TUniqueId query_id;
-    query_id.hi = 1;
-    query_id.lo = 1;
-    io::FileCacheSettings settings;
-    settings.query_queue_size = 30;
-    settings.query_queue_elements = 5;
-    settings.capacity = 30;
-    settings.max_file_block_size = 30;
-    settings.max_query_cache_size = 30;
-    io::CacheContext context;
-    context.cache_type = io::FileCacheType::TTL;
-    context.query_id = query_id;
-    int64_t cur_time = UnixSeconds();
-    context.expiration_time = cur_time + 120;
-    int64_t modify_time = cur_time + 5;
-    auto key1 = io::BlockFileCache::hash("key1");
-    io::BlockFileCache cache(cache_base_path, settings);
-    ASSERT_TRUE(cache.initialize());
-    for (int i = 0; i < 100; i++) {
-        if (cache.get_async_open_success()) {
-            break;
-        };
-        std::this_thread::sleep_for(std::chrono::milliseconds(1));
-    }
-    {
-        auto holder = cache.get_or_set(key1, 50, 10, context); /// Add range 
[50, 59]
-        auto segments = fromHolder(holder);
-        ASSERT_EQ(segments.size(), 1);
-        assert_range(1, segments[0], io::FileBlock::Range(50, 59), 
io::FileBlock::State::EMPTY);
-        ASSERT_TRUE(segments[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
-        download(segments[0]);
-        assert_range(1, segments[0], io::FileBlock::Range(50, 59),
-                     io::FileBlock::State::DOWNLOADED);
-        EXPECT_EQ(segments[0]->cache_type(), io::FileCacheType::TTL);
-        EXPECT_EQ(segments[0]->expiration_time(), context.expiration_time);
-    }
-    context.cache_type = io::FileCacheType::NORMAL;
-    context.expiration_time = 0;
-    {
-        auto holder = cache.get_or_set(key1, 50, 10, context); /// Add range 
[50, 59]
-        auto segments = fromHolder(holder);
-        ASSERT_EQ(segments.size(), 1);
-        assert_range(1, segments[0], io::FileBlock::Range(50, 59),
-                     io::FileBlock::State::DOWNLOADED);
-        EXPECT_EQ(segments[0]->cache_type(), io::FileCacheType::NORMAL);
-        EXPECT_EQ(segments[0]->expiration_time(), 0);
+        EXPECT_EQ(segments[0]->cache_type(), io::FileCacheType::NORMAL);
+        EXPECT_EQ(segments[0]->expiration_time(), 0);
     }
     sp->clear_call_back("FileBlock::change_cache_type");
     context.cache_type = io::FileCacheType::TTL;
@@ -5493,4 +5284,1388 @@ TEST_F(BlockFileCacheTest, 
file_cache_path_storage_parse) {
     }
 }
 
+TEST_F(BlockFileCacheTest, populate_empty_cache_with_disposable) {
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    fs::create_directories(cache_base_path);
+    TUniqueId query_id;
+    query_id.hi = 1;
+    query_id.lo = 1;
+    io::FileCacheSettings settings;
+
+    settings.ttl_queue_size = 5000000;
+    settings.ttl_queue_elements = 50000;
+    settings.query_queue_size = 3000000;
+    settings.query_queue_elements = 30000;
+    settings.index_queue_size = 1000000;
+    settings.index_queue_elements = 10000;
+    settings.disposable_queue_size = 1000000;
+    settings.disposable_queue_elements = 10000;
+    settings.capacity = 10000000;
+    settings.max_file_block_size = 100000;
+    settings.max_query_cache_size = 30;
+
+    size_t limit = 1000000;
+    size_t cache_max = 10000000;
+    io::CacheContext context;
+    context.cache_type = io::FileCacheType::DISPOSABLE;
+    context.query_id = query_id;
+    // int64_t cur_time = UnixSeconds();
+    // context.expiration_time = cur_time + 120;
+    auto key1 = io::BlockFileCache::hash("key1");
+    io::BlockFileCache cache(cache_base_path, settings);
+    ASSERT_TRUE(cache.initialize());
+    int i = 0;
+    for (; i < 100; i++) {
+        if (cache.get_async_open_success()) {
+            break;
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    }
+    ASSERT_TRUE(cache.get_async_open_success());
+    int64_t offset = 0;
+    // fill the cache to its limit
+    for (; offset < limit; offset += 100000) {
+        auto holder = cache.get_or_set(key1, offset, 100000, context);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    // grab more exceed the limit to max cache capacity
+    for (; offset < cache_max; offset += 100000) {
+        auto holder = cache.get_or_set(key1, offset, 100000, context);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(3, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(4, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 
cache_max);
+    ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 0);
+    
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE]->get_value(),
 0);
+
+    // grab more exceed the cache capacity
+    size_t exceed = 2000000;
+    for (; offset < (cache_max + exceed); offset += 100000) {
+        auto holder = cache.get_or_set(key1, offset, 100000, context);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(5, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(6, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 
cache_max);
+    ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 0);
+    
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE]->get_value(),
+              exceed);
+
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+}
+
+TEST_F(BlockFileCacheTest, populate_empty_cache_with_normal) {
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    fs::create_directories(cache_base_path);
+    TUniqueId query_id;
+    query_id.hi = 1;
+    query_id.lo = 1;
+    io::FileCacheSettings settings;
+
+    settings.ttl_queue_size = 5000000;
+    settings.ttl_queue_elements = 50000;
+    settings.query_queue_size = 3000000;
+    settings.query_queue_elements = 30000;
+    settings.index_queue_size = 1000000;
+    settings.index_queue_elements = 10000;
+    settings.disposable_queue_size = 1000000;
+    settings.disposable_queue_elements = 10000;
+    settings.capacity = 10000000;
+    settings.max_file_block_size = 100000;
+    settings.max_query_cache_size = 30;
+
+    size_t limit = 3000000;
+    size_t cache_max = 10000000;
+    io::CacheContext context;
+    context.cache_type = io::FileCacheType::NORMAL;
+    context.query_id = query_id;
+    // int64_t cur_time = UnixSeconds();
+    // context.expiration_time = cur_time + 120;
+    auto key1 = io::BlockFileCache::hash("key1");
+    io::BlockFileCache cache(cache_base_path, settings);
+    ASSERT_TRUE(cache.initialize());
+    int i = 0;
+    for (; i < 100; i++) {
+        if (cache.get_async_open_success()) {
+            break;
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    }
+    ASSERT_TRUE(cache.get_async_open_success());
+    int64_t offset = 0;
+    // fill the cache to its limit
+    for (; offset < limit; offset += 100000) {
+        auto holder = cache.get_or_set(key1, offset, 100000, context);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    // grab more exceed the limit to max cache capacity
+    for (; offset < cache_max; offset += 100000) {
+        auto holder = cache.get_or_set(key1, offset, 100000, context);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(3, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(4, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], cache_max);
+    
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL]->get_value(),
 0);
+
+    // grab more exceed the cache capacity
+    size_t exceed = 2000000;
+    for (; offset < (cache_max + exceed); offset += 100000) {
+        auto holder = cache.get_or_set(key1, offset, 100000, context);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(5, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(6, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], cache_max);
+    
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL]->get_value(),
 exceed);
+
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+}
+
+TEST_F(BlockFileCacheTest, populate_empty_cache_with_index) {
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    fs::create_directories(cache_base_path);
+    TUniqueId query_id;
+    query_id.hi = 1;
+    query_id.lo = 1;
+    io::FileCacheSettings settings;
+
+    settings.ttl_queue_size = 5000000;
+    settings.ttl_queue_elements = 50000;
+    settings.query_queue_size = 3000000;
+    settings.query_queue_elements = 30000;
+    settings.index_queue_size = 1000000;
+    settings.index_queue_elements = 10000;
+    settings.disposable_queue_size = 1000000;
+    settings.disposable_queue_elements = 10000;
+    settings.capacity = 10000000;
+    settings.max_file_block_size = 100000;
+    settings.max_query_cache_size = 30;
+
+    size_t limit = 1000000;
+    size_t cache_max = 10000000;
+    io::CacheContext context;
+    context.cache_type = io::FileCacheType::INDEX;
+    context.query_id = query_id;
+    // int64_t cur_time = UnixSeconds();
+    // context.expiration_time = cur_time + 120;
+    auto key1 = io::BlockFileCache::hash("key1");
+    io::BlockFileCache cache(cache_base_path, settings);
+    ASSERT_TRUE(cache.initialize());
+    int i = 0;
+    for (; i < 100; i++) {
+        if (cache.get_async_open_success()) {
+            break;
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    }
+    ASSERT_TRUE(cache.get_async_open_success());
+    int64_t offset = 0;
+    // fill the cache to its limit
+    for (; offset < limit; offset += 100000) {
+        auto holder = cache.get_or_set(key1, offset, 100000, context);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    // grab more exceed the limit to max cache capacity
+    for (; offset < cache_max; offset += 100000) {
+        auto holder = cache.get_or_set(key1, offset, 100000, context);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(3, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(4, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], cache_max);
+    ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 0);
+    
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::INDEX]->get_value(),
 0);
+
+    // grab more exceed the cache capacity
+    size_t exceed = 2000000;
+    for (; offset < (cache_max + exceed); offset += 100000) {
+        auto holder = cache.get_or_set(key1, offset, 100000, context);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(5, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(6, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], cache_max);
+    ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 0);
+    
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::INDEX]->get_value(),
 exceed);
+
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+}
+
+TEST_F(BlockFileCacheTest, populate_empty_cache_with_ttl) {
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    fs::create_directories(cache_base_path);
+    TUniqueId query_id;
+    query_id.hi = 1;
+    query_id.lo = 1;
+    io::FileCacheSettings settings;
+
+    settings.ttl_queue_size = 5000000;
+    settings.ttl_queue_elements = 50000;
+    settings.query_queue_size = 3000000;
+    settings.query_queue_elements = 30000;
+    settings.index_queue_size = 1000000;
+    settings.index_queue_elements = 10000;
+    settings.disposable_queue_size = 1000000;
+    settings.disposable_queue_elements = 10000;
+    settings.capacity = 10000000;
+    settings.max_file_block_size = 100000;
+    settings.max_query_cache_size = 30;
+
+    size_t limit = 5000000;
+    size_t cache_max = 10000000;
+    io::CacheContext context;
+    context.cache_type = io::FileCacheType::TTL;
+    context.query_id = query_id;
+    int64_t cur_time = UnixSeconds();
+    context.expiration_time = cur_time + 120;
+    auto key1 = io::BlockFileCache::hash("key1");
+    io::BlockFileCache cache(cache_base_path, settings);
+    ASSERT_TRUE(cache.initialize());
+    int i = 0;
+    for (; i < 100; i++) {
+        if (cache.get_async_open_success()) {
+            break;
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    }
+    ASSERT_TRUE(cache.get_async_open_success());
+    int64_t offset = 0;
+    // fill the cache to its limit
+    for (; offset < limit; offset += 100000) {
+        auto holder = cache.get_or_set(key1, offset, 100000, context);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    // grab more exceed the limit to max cache capacity
+    for (; offset < cache_max; offset += 100000) {
+        auto holder = cache.get_or_set(key1, offset, 100000, context);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(3, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(4, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], cache_max);
+    ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 0);
+    
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::TTL]->get_value(),
 0);
+
+    // grab more exceed the cache capacity
+    size_t exceed = 2000000;
+    for (; offset < (cache_max + exceed); offset += 100000) {
+        auto holder = cache.get_or_set(key1, offset, 100000, context);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(5, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(6, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], cache_max);
+    ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 0);
+    
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::TTL]->get_value(),
 exceed);
+
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+}
+
+TEST_F(BlockFileCacheTest, disposable_seize_after_normal) {
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    fs::create_directories(cache_base_path);
+    TUniqueId query_id;
+    query_id.hi = 1;
+    query_id.lo = 1;
+    io::FileCacheSettings settings;
+
+    settings.ttl_queue_size = 5000000;
+    settings.ttl_queue_elements = 50000;
+    settings.query_queue_size = 3000000;
+    settings.query_queue_elements = 30000;
+    settings.index_queue_size = 1000000;
+    settings.index_queue_elements = 10000;
+    settings.disposable_queue_size = 1000000;
+    settings.disposable_queue_elements = 10000;
+    settings.capacity = 10000000;
+    settings.max_file_block_size = 100000;
+    settings.max_query_cache_size = 30;
+
+    io::BlockFileCache cache(cache_base_path, settings);
+    ASSERT_TRUE(cache.initialize());
+    int i = 0;
+    for (; i < 100; i++) {
+        if (cache.get_async_open_success()) {
+            break;
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    }
+    ASSERT_TRUE(cache.get_async_open_success());
+
+    size_t limit = 1000000;
+    size_t cache_max = 10000000;
+
+    io::CacheContext context1;
+    context1.cache_type = io::FileCacheType::NORMAL;
+    context1.query_id = query_id;
+    auto key1 = io::BlockFileCache::hash("key1");
+
+    int64_t offset = 0;
+    // fill the cache
+    for (; offset < cache_max; offset += 100000) {
+        auto holder = cache.get_or_set(key1, offset, 100000, context1);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], cache_max);
+    // our hero comes to the stage
+    io::CacheContext context2;
+    context2.cache_type = io::FileCacheType::DISPOSABLE;
+    context2.query_id = query_id;
+    auto key2 = io::BlockFileCache::hash("key2");
+    offset = 0;
+    for (; offset < limit; offset += 100000) {
+        auto holder = cache.get_or_set(key2, offset, 100000, context2);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(3, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(4, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], limit);
+    ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], cache_max - 
limit);
+    
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE]
+                      ->get_value(),
+              limit);
+
+    // grab more exceed the limit
+    size_t exceed = 2000000;
+    for (; offset < (limit + exceed); offset += 100000) {
+        auto holder = cache.get_or_set(key2, offset, 100000, context2);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(5, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(6, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], limit);
+    ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], cache_max - 
limit);
+    
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE]->get_value(),
+              exceed);
+
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+}
+
+TEST_F(BlockFileCacheTest, seize_after_full) {
+    struct Args {
+        io::FileCacheType first_type;
+        io::FileCacheType second_type;
+        size_t second_limit;
+        std::string first_metrics;
+        std::string second_metrics;
+    };
+
+    std::vector<Args> args_vec = {
+            {io::FileCacheType::NORMAL, io::FileCacheType::DISPOSABLE, 1000000,
+             "normal_queue_curr_size", "disposable_queue_curr_size"},
+            {io::FileCacheType::NORMAL, io::FileCacheType::INDEX, 1000000, 
"normal_queue_curr_size",
+             "index_queue_curr_size"},
+            {io::FileCacheType::NORMAL, io::FileCacheType::TTL, 5000000, 
"normal_queue_curr_size",
+             "ttl_queue_curr_size"},
+            {io::FileCacheType::DISPOSABLE, io::FileCacheType::NORMAL, 3000000,
+             "disposable_queue_curr_size", "normal_queue_curr_size"},
+            {io::FileCacheType::DISPOSABLE, io::FileCacheType::INDEX, 1000000,
+             "disposable_queue_curr_size", "index_queue_curr_size"},
+            {io::FileCacheType::DISPOSABLE, io::FileCacheType::TTL, 5000000,
+             "disposable_queue_curr_size", "ttl_queue_curr_size"},
+            {io::FileCacheType::INDEX, io::FileCacheType::NORMAL, 3000000, 
"index_queue_curr_size",
+             "normal_queue_curr_size"},
+            {io::FileCacheType::INDEX, io::FileCacheType::DISPOSABLE, 1000000,
+             "index_queue_curr_size", "disposable_queue_curr_size"},
+            {io::FileCacheType::INDEX, io::FileCacheType::TTL, 5000000, 
"index_queue_curr_size",
+             "ttl_queue_curr_size"},
+            {io::FileCacheType::TTL, io::FileCacheType::NORMAL, 3000000, 
"ttl_queue_curr_size",
+             "normal_queue_curr_size"},
+            {io::FileCacheType::TTL, io::FileCacheType::DISPOSABLE, 1000000, 
"ttl_queue_curr_size",
+             "disposable_queue_curr_size"},
+            {io::FileCacheType::TTL, io::FileCacheType::INDEX, 1000000, 
"ttl_queue_curr_size",
+             "index_queue_curr_size"},
+    };
+
+    for (auto& args : args_vec) {
+        std::cout << "filled with " << 
io::BlockFileCache::cache_type_to_string(args.first_type)
+                  << " and seize with "
+                  << 
io::BlockFileCache::cache_type_to_string(args.second_type) << std::endl;
+        if (fs::exists(cache_base_path)) {
+            fs::remove_all(cache_base_path);
+        }
+        fs::create_directories(cache_base_path);
+        TUniqueId query_id;
+        query_id.hi = 1;
+        query_id.lo = 1;
+        io::FileCacheSettings settings;
+
+        settings.ttl_queue_size = 5000000;
+        settings.ttl_queue_elements = 50000;
+        settings.query_queue_size = 3000000;
+        settings.query_queue_elements = 30000;
+        settings.index_queue_size = 1000000;
+        settings.index_queue_elements = 10000;
+        settings.disposable_queue_size = 1000000;
+        settings.disposable_queue_elements = 10000;
+        settings.capacity = 10000000;
+        settings.max_file_block_size = 100000;
+        settings.max_query_cache_size = 30;
+
+        io::BlockFileCache cache(cache_base_path, settings);
+        ASSERT_TRUE(cache.initialize());
+        int i = 0;
+        for (; i < 100; i++) {
+            if (cache.get_async_open_success()) {
+                break;
+            }
+            std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        }
+        ASSERT_TRUE(cache.get_async_open_success());
+
+        size_t limit = args.second_limit;
+        size_t cache_max = 10000000;
+
+        io::CacheContext context1;
+        context1.cache_type = args.first_type;
+        context1.query_id = query_id;
+        if (args.first_type == io::FileCacheType::TTL) {
+            int64_t cur_time = UnixSeconds();
+            context1.expiration_time = cur_time + 120;
+        }
+        auto key1 = io::BlockFileCache::hash("key1");
+
+        int64_t offset = 0;
+        // fill the cache
+        for (; offset < cache_max; offset += 100000) {
+            auto holder = cache.get_or_set(key1, offset, 100000, context1);
+            auto blocks = fromHolder(holder);
+            ASSERT_EQ(blocks.size(), 1);
+
+            assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                         io::FileBlock::State::EMPTY);
+            ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+            download(blocks[0]);
+            assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                         io::FileBlock::State::DOWNLOADED);
+
+            blocks.clear();
+        }
+        ASSERT_EQ(cache.get_stats_unsafe()[args.first_metrics], cache_max);
+        // our hero comes to the stage
+        io::CacheContext context2;
+        context2.cache_type = args.second_type;
+        context2.query_id = query_id;
+        if (context2.cache_type == io::FileCacheType::TTL) {
+            int64_t cur_time = UnixSeconds();
+            context2.expiration_time = cur_time + 120;
+        }
+        auto key2 = io::BlockFileCache::hash("key2");
+        offset = 0;
+        for (; offset < limit; offset += 100000) {
+            auto holder = cache.get_or_set(key2, offset, 100000, context2);
+            auto blocks = fromHolder(holder);
+            ASSERT_EQ(blocks.size(), 1);
+
+            assert_range(3, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                         io::FileBlock::State::EMPTY);
+            ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+            download(blocks[0]);
+            assert_range(4, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                         io::FileBlock::State::DOWNLOADED);
+
+            blocks.clear();
+        }
+        ASSERT_EQ(cache.get_stats_unsafe()[args.second_metrics], limit);
+        ASSERT_EQ(cache.get_stats_unsafe()[args.first_metrics], cache_max - 
limit);
+        ASSERT_EQ(
+                
cache._evict_by_size_metrics_matrix[args.first_type][args.second_type]->get_value(),
+                limit);
+
+        // grab more exceed the limit
+        size_t exceed = 2000000;
+        for (; offset < (limit + exceed); offset += 100000) {
+            auto holder = cache.get_or_set(key2, offset, 100000, context2);
+            auto blocks = fromHolder(holder);
+            ASSERT_EQ(blocks.size(), 1);
+
+            assert_range(5, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                         io::FileBlock::State::EMPTY);
+            ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+            download(blocks[0]);
+            assert_range(6, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                         io::FileBlock::State::DOWNLOADED);
+
+            blocks.clear();
+        }
+        ASSERT_EQ(cache.get_stats_unsafe()[args.second_metrics], limit);
+        ASSERT_EQ(cache.get_stats_unsafe()[args.first_metrics], cache_max - 
limit);
+        
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[args.second_type]->get_value(),
 exceed);
+
+        if (fs::exists(cache_base_path)) {
+            fs::remove_all(cache_base_path);
+        }
+    }
+}
+
+TEST_F(BlockFileCacheTest, evict_privilege_order_for_disposable) {
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    fs::create_directories(cache_base_path);
+    TUniqueId query_id;
+    query_id.hi = 1;
+    query_id.lo = 1;
+    io::FileCacheSettings settings;
+
+    settings.ttl_queue_size = 5000000;
+    settings.ttl_queue_elements = 50000;
+    settings.query_queue_size = 3000000;
+    settings.query_queue_elements = 30000;
+    settings.index_queue_size = 1000000;
+    settings.index_queue_elements = 10000;
+    settings.disposable_queue_size = 1000000;
+    settings.disposable_queue_elements = 10000;
+    settings.capacity = 10000000;
+    settings.max_file_block_size = 100000;
+    settings.max_query_cache_size = 30;
+
+    io::BlockFileCache cache(cache_base_path, settings);
+    ASSERT_TRUE(cache.initialize());
+    int i = 0;
+    for (; i < 100; i++) {
+        if (cache.get_async_open_success()) {
+            break;
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    }
+    ASSERT_TRUE(cache.get_async_open_success());
+
+    io::CacheContext context1;
+    context1.cache_type = io::FileCacheType::NORMAL;
+    context1.query_id = query_id;
+    auto key1 = io::BlockFileCache::hash("key1");
+
+    int64_t offset = 0;
+
+    for (; offset < 3500000; offset += 100000) {
+        auto holder = cache.get_or_set(key1, offset, 100000, context1);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    io::CacheContext context2;
+    context2.cache_type = io::FileCacheType::INDEX;
+    context2.query_id = query_id;
+    auto key2 = io::BlockFileCache::hash("key2");
+
+    offset = 0;
+
+    for (; offset < 1300000; offset += 100000) {
+        auto holder = cache.get_or_set(key2, offset, 100000, context2);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    io::CacheContext context3;
+    context3.cache_type = io::FileCacheType::TTL;
+    context3.query_id = query_id;
+    context3.expiration_time = UnixSeconds() + 120;
+    auto key3 = io::BlockFileCache::hash("key3");
+
+    offset = 0;
+
+    for (; offset < 5200000; offset += 100000) {
+        auto holder = cache.get_or_set(key3, offset, 100000, context3);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5200000);
+    ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1300000);
+    ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3500000);
+
+    // our hero comes to the stage
+    io::CacheContext context4;
+    context4.cache_type = io::FileCacheType::DISPOSABLE;
+    context4.query_id = query_id;
+    auto key4 = io::BlockFileCache::hash("key4");
+
+    offset = 0;
+
+    for (; offset < 1000000; offset += 100000) {
+        auto holder = cache.get_or_set(key4, offset, 100000, context4);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1000000);
+    ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5000000);
+    ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1000000);
+    ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3000000);
+    
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE]
+                      ->get_value(),
+              500000);
+    
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE]
+                      ->get_value(),
+              300000);
+    
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE]
+                      ->get_value(),
+              200000);
+
+    size_t exceed = 200000;
+    for (; offset < (1000000 + exceed); offset += 100000) {
+        auto holder = cache.get_or_set(key4, offset, 100000, context4);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(3, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(4, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1000000);
+    ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5000000);
+    ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1000000);
+    ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3000000);
+    
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE]
+                      ->get_value(),
+              500000);
+    
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE]
+                      ->get_value(),
+              300000);
+    
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE]
+                      ->get_value(),
+              200000);
+    
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE]->get_value(),
+              exceed);
+
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+}
+
+TEST_F(BlockFileCacheTest, evict_privilege_order_for_normal) {
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    fs::create_directories(cache_base_path);
+    TUniqueId query_id;
+    query_id.hi = 1;
+    query_id.lo = 1;
+    io::FileCacheSettings settings;
+
+    settings.ttl_queue_size = 5000000;
+    settings.ttl_queue_elements = 50000;
+    settings.query_queue_size = 3000000;
+    settings.query_queue_elements = 30000;
+    settings.index_queue_size = 1000000;
+    settings.index_queue_elements = 10000;
+    settings.disposable_queue_size = 1000000;
+    settings.disposable_queue_elements = 10000;
+    settings.capacity = 10000000;
+    settings.max_file_block_size = 100000;
+    settings.max_query_cache_size = 30;
+
+    io::BlockFileCache cache(cache_base_path, settings);
+    ASSERT_TRUE(cache.initialize());
+    int i = 0;
+    for (; i < 100; i++) {
+        if (cache.get_async_open_success()) {
+            break;
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    }
+    ASSERT_TRUE(cache.get_async_open_success());
+
+    io::CacheContext context1;
+    context1.cache_type = io::FileCacheType::DISPOSABLE;
+    context1.query_id = query_id;
+    auto key1 = io::BlockFileCache::hash("key1");
+
+    int64_t offset = 0;
+
+    for (; offset < 1500000; offset += 100000) {
+        auto holder = cache.get_or_set(key1, offset, 100000, context1);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    io::CacheContext context2;
+    context2.cache_type = io::FileCacheType::INDEX;
+    context2.query_id = query_id;
+    auto key2 = io::BlockFileCache::hash("key2");
+
+    offset = 0;
+
+    for (; offset < 1300000; offset += 100000) {
+        auto holder = cache.get_or_set(key2, offset, 100000, context2);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    io::CacheContext context3;
+    context3.cache_type = io::FileCacheType::TTL;
+    context3.query_id = query_id;
+    context3.expiration_time = UnixSeconds() + 120;
+    auto key3 = io::BlockFileCache::hash("key3");
+
+    offset = 0;
+
+    for (; offset < 7200000; offset += 100000) {
+        auto holder = cache.get_or_set(key3, offset, 100000, context3);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1500000);
+    ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 7200000);
+    ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1300000);
+    ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 0);
+
+    // our hero comes to the stage
+    io::CacheContext context4;
+    context4.cache_type = io::FileCacheType::NORMAL;
+    context4.query_id = query_id;
+    auto key4 = io::BlockFileCache::hash("key4");
+
+    offset = 0;
+
+    for (; offset < 3000000; offset += 100000) {
+        auto holder = cache.get_or_set(key4, offset, 100000, context4);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1000000);
+    ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5000000);
+    ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1000000);
+    ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3000000);
+    
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL]
+                      ->get_value(),
+              500000);
+    
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL]
+                      ->get_value(),
+              300000);
+    
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL]
+                      ->get_value(),
+              2200000);
+
+    size_t exceed = 200000;
+    for (; offset < (3000000 + exceed); offset += 100000) {
+        auto holder = cache.get_or_set(key4, offset, 100000, context4);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(3, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(4, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1000000);
+    ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5000000);
+    ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1000000);
+    ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3000000);
+    
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL]
+                      ->get_value(),
+              500000);
+    
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL]
+                      ->get_value(),
+              300000);
+    
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL]
+                      ->get_value(),
+              2200000);
+    
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL]->get_value(),
 exceed);
+
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+}
+
+TEST_F(BlockFileCacheTest, evict_privilege_order_for_index) {
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    fs::create_directories(cache_base_path);
+    TUniqueId query_id;
+    query_id.hi = 1;
+    query_id.lo = 1;
+    io::FileCacheSettings settings;
+
+    settings.ttl_queue_size = 5000000;
+    settings.ttl_queue_elements = 50000;
+    settings.query_queue_size = 3000000;
+    settings.query_queue_elements = 30000;
+    settings.index_queue_size = 1000000;
+    settings.index_queue_elements = 10000;
+    settings.disposable_queue_size = 1000000;
+    settings.disposable_queue_elements = 10000;
+    settings.capacity = 10000000;
+    settings.max_file_block_size = 100000;
+    settings.max_query_cache_size = 30;
+
+    io::BlockFileCache cache(cache_base_path, settings);
+    ASSERT_TRUE(cache.initialize());
+    int i = 0;
+    for (; i < 100; i++) {
+        if (cache.get_async_open_success()) {
+            break;
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    }
+    ASSERT_TRUE(cache.get_async_open_success());
+
+    io::CacheContext context1;
+    context1.cache_type = io::FileCacheType::DISPOSABLE;
+    context1.query_id = query_id;
+    auto key1 = io::BlockFileCache::hash("key1");
+
+    int64_t offset = 0;
+
+    for (; offset < 1500000; offset += 100000) {
+        auto holder = cache.get_or_set(key1, offset, 100000, context1);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    io::CacheContext context2;
+    context2.cache_type = io::FileCacheType::NORMAL;
+    context2.query_id = query_id;
+    auto key2 = io::BlockFileCache::hash("key2");
+
+    offset = 0;
+
+    for (; offset < 3300000; offset += 100000) {
+        auto holder = cache.get_or_set(key2, offset, 100000, context2);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    io::CacheContext context3;
+    context3.cache_type = io::FileCacheType::TTL;
+    context3.query_id = query_id;
+    context3.expiration_time = UnixSeconds() + 120;
+    auto key3 = io::BlockFileCache::hash("key3");
+
+    offset = 0;
+
+    for (; offset < 5200000; offset += 100000) {
+        auto holder = cache.get_or_set(key3, offset, 100000, context3);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1500000);
+    ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5200000);
+    ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3300000);
+
+    // our hero comes to the stage
+    io::CacheContext context4;
+    context4.cache_type = io::FileCacheType::INDEX;
+    context4.query_id = query_id;
+    auto key4 = io::BlockFileCache::hash("key4");
+
+    offset = 0;
+
+    for (; offset < 1000000; offset += 100000) {
+        auto holder = cache.get_or_set(key4, offset, 100000, context4);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1000000);
+    ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5000000);
+    ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1000000);
+    ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3000000);
+    
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX]
+                      ->get_value(),
+              500000);
+    
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX]
+                      ->get_value(),
+              300000);
+    
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX]
+                      ->get_value(),
+              200000);
+
+    size_t exceed = 200000;
+    for (; offset < (1000000 + exceed); offset += 100000) {
+        auto holder = cache.get_or_set(key4, offset, 100000, context4);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(3, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(4, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1000000);
+    ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5000000);
+    ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1000000);
+    ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3000000);
+    
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX]
+                      ->get_value(),
+              500000);
+    
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX]
+                      ->get_value(),
+              300000);
+    
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX]
+                      ->get_value(),
+              200000);
+    
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::INDEX]->get_value(),
 exceed);
+
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+}
+
+TEST_F(BlockFileCacheTest, evict_privilege_order_for_ttl) {
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    fs::create_directories(cache_base_path);
+    TUniqueId query_id;
+    query_id.hi = 1;
+    query_id.lo = 1;
+    io::FileCacheSettings settings;
+
+    settings.ttl_queue_size = 5000000;
+    settings.ttl_queue_elements = 50000;
+    settings.query_queue_size = 3000000;
+    settings.query_queue_elements = 30000;
+    settings.index_queue_size = 1000000;
+    settings.index_queue_elements = 10000;
+    settings.disposable_queue_size = 1000000;
+    settings.disposable_queue_elements = 10000;
+    settings.capacity = 10000000;
+    settings.max_file_block_size = 100000;
+    settings.max_query_cache_size = 30;
+
+    io::BlockFileCache cache(cache_base_path, settings);
+    ASSERT_TRUE(cache.initialize());
+    int i = 0;
+    for (; i < 100; i++) {
+        if (cache.get_async_open_success()) {
+            break;
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    }
+    ASSERT_TRUE(cache.get_async_open_success());
+
+    io::CacheContext context1;
+    context1.cache_type = io::FileCacheType::DISPOSABLE;
+    context1.query_id = query_id;
+    auto key1 = io::BlockFileCache::hash("key1");
+
+    int64_t offset = 0;
+
+    for (; offset < 1500000; offset += 100000) {
+        auto holder = cache.get_or_set(key1, offset, 100000, context1);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    io::CacheContext context2;
+    context2.cache_type = io::FileCacheType::INDEX;
+    context2.query_id = query_id;
+    auto key2 = io::BlockFileCache::hash("key2");
+
+    offset = 0;
+
+    for (; offset < 1300000; offset += 100000) {
+        auto holder = cache.get_or_set(key2, offset, 100000, context2);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    io::CacheContext context3;
+    context3.cache_type = io::FileCacheType::NORMAL;
+    context3.query_id = query_id;
+    auto key3 = io::BlockFileCache::hash("key3");
+
+    offset = 0;
+
+    for (; offset < 7200000; offset += 100000) {
+        auto holder = cache.get_or_set(key3, offset, 100000, context3);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1500000);
+    ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1300000);
+    ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 7200000);
+
+    // our hero comes to the stage
+    io::CacheContext context4;
+    context4.cache_type = io::FileCacheType::TTL;
+    context4.query_id = query_id;
+    context4.expiration_time = UnixSeconds() + 120;
+    auto key4 = io::BlockFileCache::hash("key4");
+
+    offset = 0;
+
+    for (; offset < 5000000; offset += 100000) {
+        auto holder = cache.get_or_set(key4, offset, 100000, context4);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1000000);
+    ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5000000);
+    ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1000000);
+    ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3000000);
+    
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL]
+                      ->get_value(),
+              500000);
+    
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL]
+                      ->get_value(),
+              300000);
+    
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL]
+                      ->get_value(),
+              4200000);
+
+    size_t exceed = 200000;
+    for (; offset < (5000000 + exceed); offset += 100000) {
+        auto holder = cache.get_or_set(key4, offset, 100000, context4);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(3, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(4, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1000000);
+    ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5000000);
+    ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1000000);
+    ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3000000);
+    
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL]
+                      ->get_value(),
+              500000);
+    
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL]
+                      ->get_value(),
+              300000);
+    
ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL]
+                      ->get_value(),
+              4200000);
+    
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::TTL]->get_value(),
 exceed);
+
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+}
+
 } // namespace doris::io


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to