This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 85f37257c22 [feature](Cache) Limit cache usage of TTL (#34084)
85f37257c22 is described below
commit 85f37257c2238ebc4e69ada3acc7ac089f847dfb
Author: AlexYue <[email protected]>
AuthorDate: Thu Apr 25 20:39:25 2024 +0800
[feature](Cache) Limit cache usage of TTL (#34084)
---
be/src/common/config.cpp | 2 +
be/src/common/config.h | 2 +
be/src/io/cache/block_file_cache.cpp | 13 ++++-
be/src/io/cache/block_file_cache.h | 1 +
be/src/io/cache/file_block.cpp | 5 ++
be/src/io/cache/file_block.h | 2 +
be/test/io/cache/block_file_cache_test.cpp | 79 ++++++++++++++++++++++++------
7 files changed, 89 insertions(+), 15 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 11a8b97842d..e1c8114800a 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1226,6 +1226,8 @@
DEFINE_Int64(num_buffered_reader_prefetch_thread_pool_max_thread, "64");
DEFINE_Int64(num_s3_file_upload_thread_pool_min_thread, "16");
// The max thread num for S3FileUploadThreadPool
DEFINE_Int64(num_s3_file_upload_thread_pool_max_thread, "64");
+// The max ratio for ttl cache's size
+DEFINE_mInt64(max_ttl_cache_ratio, "90");
// clang-format off
#ifdef BE_TEST
diff --git a/be/src/common/config.h b/be/src/common/config.h
index fe7009c7691..ce244da36d5 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1306,6 +1306,8 @@
DECLARE_Int64(num_buffered_reader_prefetch_thread_pool_max_thread);
DECLARE_Int64(num_s3_file_upload_thread_pool_min_thread);
// The max thread num for S3FileUploadThreadPool
DECLARE_Int64(num_s3_file_upload_thread_pool_max_thread);
+// The max ratio for ttl cache's size
+DECLARE_mInt64(max_ttl_cache_ratio);
#ifdef BE_TEST
// test s3
diff --git a/be/src/io/cache/block_file_cache.cpp
b/be/src/io/cache/block_file_cache.cpp
index b41cb9f6a5f..26ca8e47596 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -623,7 +623,9 @@ BlockFileCache::FileBlockCell*
BlockFileCache::add_cell(const UInt128Wrapper& ha
auto& offsets = _files[hash];
DCHECK((context.expiration_time == 0 && context.cache_type !=
FileCacheType::TTL) ||
- (context.cache_type == FileCacheType::TTL &&
context.expiration_time != 0));
+ (context.cache_type == FileCacheType::TTL &&
context.expiration_time != 0))
+ << fmt::format("expiration time {}, cache type {}",
context.expiration_time,
+ context.cache_type);
FileCacheKey key;
key.hash = hash;
@@ -639,6 +641,7 @@ BlockFileCache::FileBlockCell*
BlockFileCache::add_cell(const UInt128Wrapper& ha
_key_to_time[hash] = context.expiration_time;
_time_to_key.insert(std::make_pair(context.expiration_time, hash));
}
+ _cur_ttl_size += cell.size();
}
auto [it, _] = offsets.insert(std::make_pair(offset, std::move(cell)));
_cur_cache_size += size;
@@ -695,6 +698,10 @@ const BlockFileCache::LRUQueue&
BlockFileCache::get_queue(FileCacheType type) co
bool BlockFileCache::try_reserve_for_ttl(size_t size,
std::lock_guard<std::mutex>& cache_lock) {
size_t removed_size = 0;
size_t cur_cache_size = _cur_cache_size;
+ auto limit = config::max_ttl_cache_ratio * _capacity;
+ if ((_cur_ttl_size + size) * 100 > limit) {
+ return false;
+ }
auto is_overflow = [&] {
return _disk_resource_limit_mode ? removed_size < size
: cur_cache_size + size -
removed_size > _capacity;
@@ -1129,6 +1136,9 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T&
cache_lock, U& block_lo
}
}
_cur_cache_size -= file_block->range().size();
+ if (FileCacheType::TTL == type) {
+ _cur_ttl_size -= file_block->range().size();
+ }
auto& offsets = _files[hash];
offsets.erase(file_block->offset());
if (offsets.empty()) {
@@ -1544,6 +1554,7 @@ std::string BlockFileCache::clear_file_cache_directly() {
int64_t disposible_queue_size =
_disposable_queue.get_elements_num(cache_lock);
_files.clear();
_cur_cache_size = 0;
+ _cur_ttl_size = 0;
_time_to_key.clear();
_key_to_time.clear();
_index_queue.clear(cache_lock);
diff --git a/be/src/io/cache/block_file_cache.h
b/be/src/io/cache/block_file_cache.h
index 282148aa566..f086c2c680e 100644
--- a/be/src/io/cache/block_file_cache.h
+++ b/be/src/io/cache/block_file_cache.h
@@ -394,6 +394,7 @@ private:
CachedFiles _files;
QueryFileCacheContextMap _query_map;
size_t _cur_cache_size = 0;
+ size_t _cur_ttl_size = 0;
std::multimap<uint64_t, UInt128Wrapper> _time_to_key;
std::unordered_map<UInt128Wrapper, uint64_t, KeyHash> _key_to_time;
// The three queues are level queue.
diff --git a/be/src/io/cache/file_block.cpp b/be/src/io/cache/file_block.cpp
index 2efc26fb1a6..5985aa95f7a 100644
--- a/be/src/io/cache/file_block.cpp
+++ b/be/src/io/cache/file_block.cpp
@@ -30,6 +30,11 @@
namespace doris {
namespace io {
+std::ostream& operator<<(std::ostream& os, const FileBlock::State& value) {
+ os << FileBlock::state_to_string(value);
+ return os;
+}
+
FileBlock::FileBlock(const FileCacheKey& key, size_t size, BlockFileCache* mgr,
State download_state)
: _block_range(key.offset, key.offset + size - 1),
diff --git a/be/src/io/cache/file_block.h b/be/src/io/cache/file_block.h
index dd4ef375707..2587cd8607f 100644
--- a/be/src/io/cache/file_block.h
+++ b/be/src/io/cache/file_block.h
@@ -154,6 +154,8 @@ private:
size_t _downloaded_size {0};
};
+extern std::ostream& operator<<(std::ostream& os, const FileBlock::State&
value);
+
using FileBlockSPtr = std::shared_ptr<FileBlock>;
using FileBlocks = std::list<FileBlockSPtr>;
diff --git a/be/test/io/cache/block_file_cache_test.cpp
b/be/test/io/cache/block_file_cache_test.cpp
index 6b1139d1b41..64778b396a2 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -679,6 +679,59 @@ TEST_F(BlockFileCacheTest, resize) {
}
}
+TEST_F(BlockFileCacheTest, max_ttl_size) {
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+ settings.query_queue_size = 100000000;
+ settings.query_queue_elements = 100000;
+ settings.capacity = 100000000;
+ settings.max_file_block_size = 100000;
+ settings.max_query_cache_size = 30;
+ io::CacheContext context;
+ context.cache_type = io::FileCacheType::TTL;
+ context.query_id = query_id;
+ int64_t cur_time = UnixSeconds();
+ context.expiration_time = cur_time + 120;
+ auto key1 = io::BlockFileCache::hash("key5");
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ int i = 0;
+ for (; i < 100; i++) {
+ if (cache.get_lazy_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ ASSERT_TRUE(cache.get_lazy_open_success());
+ int64_t offset = 0;
+ for (; offset < 100000000; offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ if (offset < 90000000) {
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+ } else {
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::SKIP_CACHE);
+ }
+ blocks.clear();
+ }
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+}
+
TEST_F(BlockFileCacheTest, query_limit_heap_use_after_free) {
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
@@ -1773,10 +1826,10 @@ TEST_F(BlockFileCacheTest, ttl_reverse) {
query_id.hi = 1;
query_id.lo = 1;
io::FileCacheSettings settings;
- settings.query_queue_size = 30;
+ settings.query_queue_size = 36;
settings.query_queue_elements = 5;
- settings.capacity = 30;
- settings.max_file_block_size = 5;
+ settings.capacity = 36;
+ settings.max_file_block_size = 7;
settings.max_query_cache_size = 30;
io::CacheContext context;
context.cache_type = io::FileCacheType::TTL;
@@ -1792,25 +1845,23 @@ TEST_F(BlockFileCacheTest, ttl_reverse) {
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
- {
- auto holder = cache.get_or_set(key2, 0, 30, context); /// Add range
[0, 29]
+ ASSERT_TRUE(cache.get_lazy_open_success());
+ for (size_t offset = 0; offset < 30; offset += 6) {
+ auto holder = cache.get_or_set(key2, offset, 6, context);
auto blocks = fromHolder(holder);
- for (auto& block : blocks) {
- ASSERT_TRUE(block->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
- download(block);
- }
- EXPECT_EQ(blocks.size(), 6);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
}
{
- auto holder = cache.get_or_set(key2, 50, 5, context); /// Add range
[50, 54]
+ auto holder = cache.get_or_set(key2, 50, 7, context); /// Add range
[50, 57]
auto blocks = fromHolder(holder);
- assert_range(1, blocks[0], io::FileBlock::Range(50, 54),
io::FileBlock::State::SKIP_CACHE);
+ assert_range(1, blocks[0], io::FileBlock::Range(50, 56),
io::FileBlock::State::SKIP_CACHE);
}
{
context.cache_type = io::FileCacheType::NORMAL;
- auto holder = cache.get_or_set(key2, 50, 5, context); /// Add range
[50, 54]
+ auto holder = cache.get_or_set(key2, 50, 7, context); /// Add range
[50, 57]
auto blocks = fromHolder(holder);
- assert_range(1, blocks[0], io::FileBlock::Range(50, 54),
io::FileBlock::State::SKIP_CACHE);
+ assert_range(1, blocks[0], io::FileBlock::Range(50, 56),
io::FileBlock::State::SKIP_CACHE);
}
if (fs::exists(cache_base_path)) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]