This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 85f37257c22 [feature](Cache) Limit cache usage of TTL  (#34084)
85f37257c22 is described below

commit 85f37257c2238ebc4e69ada3acc7ac089f847dfb
Author: AlexYue <[email protected]>
AuthorDate: Thu Apr 25 20:39:25 2024 +0800

    [feature](Cache) Limit cache usage of TTL  (#34084)
---
 be/src/common/config.cpp                   |  2 +
 be/src/common/config.h                     |  2 +
 be/src/io/cache/block_file_cache.cpp       | 13 ++++-
 be/src/io/cache/block_file_cache.h         |  1 +
 be/src/io/cache/file_block.cpp             |  5 ++
 be/src/io/cache/file_block.h               |  2 +
 be/test/io/cache/block_file_cache_test.cpp | 79 ++++++++++++++++++++++++------
 7 files changed, 89 insertions(+), 15 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 11a8b97842d..e1c8114800a 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1226,6 +1226,8 @@ 
DEFINE_Int64(num_buffered_reader_prefetch_thread_pool_max_thread, "64");
 DEFINE_Int64(num_s3_file_upload_thread_pool_min_thread, "16");
 // The max thread num for S3FileUploadThreadPool
 DEFINE_Int64(num_s3_file_upload_thread_pool_max_thread, "64");
+// The max ratio for ttl cache's size
+DEFINE_mInt64(max_ttl_cache_ratio, "90");
 
 // clang-format off
 #ifdef BE_TEST
diff --git a/be/src/common/config.h b/be/src/common/config.h
index fe7009c7691..ce244da36d5 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1306,6 +1306,8 @@ 
DECLARE_Int64(num_buffered_reader_prefetch_thread_pool_max_thread);
 DECLARE_Int64(num_s3_file_upload_thread_pool_min_thread);
 // The max thread num for S3FileUploadThreadPool
 DECLARE_Int64(num_s3_file_upload_thread_pool_max_thread);
+// The max ratio for ttl cache's size
+DECLARE_mInt64(max_ttl_cache_ratio);
 
 #ifdef BE_TEST
 // test s3
diff --git a/be/src/io/cache/block_file_cache.cpp 
b/be/src/io/cache/block_file_cache.cpp
index b41cb9f6a5f..26ca8e47596 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -623,7 +623,9 @@ BlockFileCache::FileBlockCell* 
BlockFileCache::add_cell(const UInt128Wrapper& ha
 
     auto& offsets = _files[hash];
     DCHECK((context.expiration_time == 0 && context.cache_type != 
FileCacheType::TTL) ||
-           (context.cache_type == FileCacheType::TTL && 
context.expiration_time != 0));
+           (context.cache_type == FileCacheType::TTL && 
context.expiration_time != 0))
+            << fmt::format("expiration time {}, cache type {}", 
context.expiration_time,
+                           context.cache_type);
 
     FileCacheKey key;
     key.hash = hash;
@@ -639,6 +641,7 @@ BlockFileCache::FileBlockCell* 
BlockFileCache::add_cell(const UInt128Wrapper& ha
             _key_to_time[hash] = context.expiration_time;
             _time_to_key.insert(std::make_pair(context.expiration_time, hash));
         }
+        _cur_ttl_size += cell.size();
     }
     auto [it, _] = offsets.insert(std::make_pair(offset, std::move(cell)));
     _cur_cache_size += size;
@@ -695,6 +698,10 @@ const BlockFileCache::LRUQueue& 
BlockFileCache::get_queue(FileCacheType type) co
 bool BlockFileCache::try_reserve_for_ttl(size_t size, 
std::lock_guard<std::mutex>& cache_lock) {
     size_t removed_size = 0;
     size_t cur_cache_size = _cur_cache_size;
+    auto limit = config::max_ttl_cache_ratio * _capacity;
+    if ((_cur_ttl_size + size) * 100 > limit) {
+        return false;
+    }
     auto is_overflow = [&] {
         return _disk_resource_limit_mode ? removed_size < size
                                          : cur_cache_size + size - 
removed_size > _capacity;
@@ -1129,6 +1136,9 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& 
cache_lock, U& block_lo
         }
     }
     _cur_cache_size -= file_block->range().size();
+    if (FileCacheType::TTL == type) {
+        _cur_ttl_size -= file_block->range().size();
+    }
     auto& offsets = _files[hash];
     offsets.erase(file_block->offset());
     if (offsets.empty()) {
@@ -1544,6 +1554,7 @@ std::string BlockFileCache::clear_file_cache_directly() {
     int64_t disposible_queue_size = 
_disposable_queue.get_elements_num(cache_lock);
     _files.clear();
     _cur_cache_size = 0;
+    _cur_ttl_size = 0;
     _time_to_key.clear();
     _key_to_time.clear();
     _index_queue.clear(cache_lock);
diff --git a/be/src/io/cache/block_file_cache.h 
b/be/src/io/cache/block_file_cache.h
index 282148aa566..f086c2c680e 100644
--- a/be/src/io/cache/block_file_cache.h
+++ b/be/src/io/cache/block_file_cache.h
@@ -394,6 +394,7 @@ private:
     CachedFiles _files;
     QueryFileCacheContextMap _query_map;
     size_t _cur_cache_size = 0;
+    size_t _cur_ttl_size = 0;
     std::multimap<uint64_t, UInt128Wrapper> _time_to_key;
     std::unordered_map<UInt128Wrapper, uint64_t, KeyHash> _key_to_time;
     // The three queues are level queue.
diff --git a/be/src/io/cache/file_block.cpp b/be/src/io/cache/file_block.cpp
index 2efc26fb1a6..5985aa95f7a 100644
--- a/be/src/io/cache/file_block.cpp
+++ b/be/src/io/cache/file_block.cpp
@@ -30,6 +30,11 @@
 namespace doris {
 namespace io {
 
+std::ostream& operator<<(std::ostream& os, const FileBlock::State& value) {
+    os << FileBlock::state_to_string(value);
+    return os;
+}
+
 FileBlock::FileBlock(const FileCacheKey& key, size_t size, BlockFileCache* mgr,
                      State download_state)
         : _block_range(key.offset, key.offset + size - 1),
diff --git a/be/src/io/cache/file_block.h b/be/src/io/cache/file_block.h
index dd4ef375707..2587cd8607f 100644
--- a/be/src/io/cache/file_block.h
+++ b/be/src/io/cache/file_block.h
@@ -154,6 +154,8 @@ private:
     size_t _downloaded_size {0};
 };
 
+extern std::ostream& operator<<(std::ostream& os, const FileBlock::State& 
value);
+
 using FileBlockSPtr = std::shared_ptr<FileBlock>;
 using FileBlocks = std::list<FileBlockSPtr>;
 
diff --git a/be/test/io/cache/block_file_cache_test.cpp 
b/be/test/io/cache/block_file_cache_test.cpp
index 6b1139d1b41..64778b396a2 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -679,6 +679,59 @@ TEST_F(BlockFileCacheTest, resize) {
     }
 }
 
+TEST_F(BlockFileCacheTest, max_ttl_size) {
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    fs::create_directories(cache_base_path);
+    TUniqueId query_id;
+    query_id.hi = 1;
+    query_id.lo = 1;
+    io::FileCacheSettings settings;
+    settings.query_queue_size = 100000000;
+    settings.query_queue_elements = 100000;
+    settings.capacity = 100000000;
+    settings.max_file_block_size = 100000;
+    settings.max_query_cache_size = 30;
+    io::CacheContext context;
+    context.cache_type = io::FileCacheType::TTL;
+    context.query_id = query_id;
+    int64_t cur_time = UnixSeconds();
+    context.expiration_time = cur_time + 120;
+    auto key1 = io::BlockFileCache::hash("key5");
+    io::BlockFileCache cache(cache_base_path, settings);
+    ASSERT_TRUE(cache.initialize());
+    int i = 0;
+    for (; i < 100; i++) {
+        if (cache.get_lazy_open_success()) {
+            break;
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    }
+    ASSERT_TRUE(cache.get_lazy_open_success());
+    int64_t offset = 0;
+    for (; offset < 100000000; offset += 100000) {
+        auto holder = cache.get_or_set(key1, offset, 100000, context);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+        if (offset < 90000000) {
+            assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                         io::FileBlock::State::EMPTY);
+            ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+            download(blocks[0]);
+            assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                         io::FileBlock::State::DOWNLOADED);
+        } else {
+            assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                         io::FileBlock::State::SKIP_CACHE);
+        }
+        blocks.clear();
+    }
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+}
+
 TEST_F(BlockFileCacheTest, query_limit_heap_use_after_free) {
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
@@ -1773,10 +1826,10 @@ TEST_F(BlockFileCacheTest, ttl_reverse) {
     query_id.hi = 1;
     query_id.lo = 1;
     io::FileCacheSettings settings;
-    settings.query_queue_size = 30;
+    settings.query_queue_size = 36;
     settings.query_queue_elements = 5;
-    settings.capacity = 30;
-    settings.max_file_block_size = 5;
+    settings.capacity = 36;
+    settings.max_file_block_size = 7;
     settings.max_query_cache_size = 30;
     io::CacheContext context;
     context.cache_type = io::FileCacheType::TTL;
@@ -1792,25 +1845,23 @@ TEST_F(BlockFileCacheTest, ttl_reverse) {
         };
         std::this_thread::sleep_for(std::chrono::milliseconds(1));
     }
-    {
-        auto holder = cache.get_or_set(key2, 0, 30, context); /// Add range 
[0, 29]
+    ASSERT_TRUE(cache.get_lazy_open_success());
+    for (size_t offset = 0; offset < 30; offset += 6) {
+        auto holder = cache.get_or_set(key2, offset, 6, context);
         auto blocks = fromHolder(holder);
-        for (auto& block : blocks) {
-            ASSERT_TRUE(block->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
-            download(block);
-        }
-        EXPECT_EQ(blocks.size(), 6);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
     }
     {
-        auto holder = cache.get_or_set(key2, 50, 5, context); /// Add range 
[50, 54]
+        auto holder = cache.get_or_set(key2, 50, 7, context); /// Add range 
[50, 57]
         auto blocks = fromHolder(holder);
-        assert_range(1, blocks[0], io::FileBlock::Range(50, 54), 
io::FileBlock::State::SKIP_CACHE);
+        assert_range(1, blocks[0], io::FileBlock::Range(50, 56), 
io::FileBlock::State::SKIP_CACHE);
     }
     {
         context.cache_type = io::FileCacheType::NORMAL;
-        auto holder = cache.get_or_set(key2, 50, 5, context); /// Add range 
[50, 54]
+        auto holder = cache.get_or_set(key2, 50, 7, context); /// Add range 
[50, 57]
         auto blocks = fromHolder(holder);
-        assert_range(1, blocks[0], io::FileBlock::Range(50, 54), 
io::FileBlock::State::SKIP_CACHE);
+        assert_range(1, blocks[0], io::FileBlock::Range(50, 56), 
io::FileBlock::State::SKIP_CACHE);
     }
 
     if (fs::exists(cache_base_path)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to