This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d330f7ba504 (cloud-merge) Support to validate TTL keys (#34142)
d330f7ba504 is described below
commit d330f7ba504106e55358b8403a39020d4a95cae5
Author: Lightman <[email protected]>
AuthorDate: Wed May 1 09:10:28 2024 +0800
(cloud-merge) Support to validate TTL keys (#34142)
Co-authored-by: Yongqiang YANG
<[email protected]>
---
be/src/cloud/cloud_storage_engine.cpp | 35 ++++++++
be/src/cloud/cloud_storage_engine.h | 2 +
be/src/common/config.cpp | 1 +
be/src/common/config.h | 1 +
be/src/io/cache/block_file_cache.cpp | 56 +++++++++---
be/src/io/cache/block_file_cache.h | 2 +
be/test/io/cache/block_file_cache_test.cpp | 137 +++++++++++++++++++++--------
7 files changed, 188 insertions(+), 46 deletions(-)
diff --git a/be/src/cloud/cloud_storage_engine.cpp
b/be/src/cloud/cloud_storage_engine.cpp
index 269ce4f231c..40e8258ec75 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -24,6 +24,7 @@
#include <rapidjson/prettywriter.h>
#include <rapidjson/stringbuffer.h>
+#include <algorithm>
#include <variant>
#include "cloud/cloud_base_compaction.h"
@@ -37,6 +38,7 @@
#include "cloud/cloud_warm_up_manager.h"
#include "cloud/config.h"
#include "io/cache/block_file_cache_downloader.h"
+#include "io/cache/file_cache_common.h"
#include "io/fs/file_system.h"
#include "io/fs/hdfs_file_system.h"
#include "io/fs/s3_file_system.h"
@@ -274,11 +276,44 @@ Status CloudStorageEngine::start_bg_threads() {
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "lease_compaction_thread",
[this]() { this->_lease_compaction_thread_callback(); },
&_bg_threads.emplace_back()));
+
+ if (config::file_cache_ttl_valid_check_interval_second != 0) {
+ RETURN_IF_ERROR(Thread::create(
+ "StorageEngine", "check_file_cache_ttl_block_valid_thread",
+ [this]() { this->_check_file_cache_ttl_block_valid(); },
+ &_bg_threads.emplace_back()));
+ LOG(INFO) << "check file cache ttl block valid thread started";
+ }
+
LOG(INFO) << "lease compaction thread started";
return Status::OK();
}
+void CloudStorageEngine::_check_file_cache_ttl_block_valid() {
+ int64_t interval_seconds =
config::file_cache_ttl_valid_check_interval_second / 2;
+ auto check_ttl = [](const std::weak_ptr<CloudTablet>& tablet_wk) {
+ auto tablet = tablet_wk.lock();
+ if (!tablet) return;
+ if (tablet->tablet_meta()->ttl_seconds() == 0) return;
+ auto rowsets = tablet->get_snapshot_rowset();
+ for (const auto& rowset : rowsets) {
+ int64_t ttl_seconds = tablet->tablet_meta()->ttl_seconds();
+ if (rowset->newest_write_timestamp() + ttl_seconds <=
UnixSeconds()) continue;
+ for (int64_t seg_id = 0; seg_id < rowset->num_segments();
seg_id++) {
+ auto seg_path = rowset->segment_file_path(seg_id);
+ auto hash =
io::BlockFileCache::hash(io::Path(seg_path).filename().native());
+ auto* file_cache =
io::FileCacheFactory::instance()->get_by_path(hash);
+ file_cache->update_ttl_atime(hash);
+ }
+ }
+ };
+ while
(!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval_seconds)))
{
+ auto weak_tablets = tablet_mgr().get_weak_tablets();
+ std::for_each(weak_tablets.begin(), weak_tablets.end(), check_ttl);
+ }
+}
+
void CloudStorageEngine::sync_storage_vault() {
cloud::StorageVaultInfos vault_infos;
auto st = _meta_mgr->get_storage_vault_info(&vault_infos);
diff --git a/be/src/cloud/cloud_storage_engine.h
b/be/src/cloud/cloud_storage_engine.h
index 6311fa21993..e482a5cff61 100644
--- a/be/src/cloud/cloud_storage_engine.h
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -25,6 +25,7 @@
#include "cloud/cloud_cumulative_compaction_policy.h"
#include "cloud/cloud_tablet.h"
#include "cloud_txn_delete_bitmap_cache.h"
+#include "io/cache/block_file_cache_factory.h"
#include "olap/storage_engine.h"
#include "olap/storage_policy.h"
#include "util/threadpool.h"
@@ -71,6 +72,7 @@ public:
ThreadPool& calc_tablet_delete_bitmap_task_thread_pool() const {
return *_calc_tablet_delete_bitmap_task_thread_pool;
}
+ void _check_file_cache_ttl_block_valid();
io::FileSystemSPtr get_fs_by_vault_id(const std::string& vault_id) const {
if (vault_id.empty()) {
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index e5686adf76c..b0612958d35 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -990,6 +990,7 @@
DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "90");
DEFINE_mInt32(file_cache_exit_disk_resource_limit_mode_percent, "80");
DEFINE_mBool(enable_read_cache_file_directly, "false");
DEFINE_mBool(file_cache_enable_evict_from_other_queue_by_size, "false");
+DEFINE_mInt64(file_cache_ttl_valid_check_interval_second, "0"); // zero for
not checking
DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800");
DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 0e4300c3c97..f2cdc35ed07 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1034,6 +1034,7 @@
DECLARE_Int32(file_cache_enter_disk_resource_limit_mode_percent);
DECLARE_Int32(file_cache_exit_disk_resource_limit_mode_percent);
DECLARE_mBool(enable_read_cache_file_directly);
DECLARE_Bool(file_cache_enable_evict_from_other_queue_by_size);
+DECLARE_mInt64(file_cache_ttl_valid_check_interval_second);
// inverted index searcher cache
// cache entry stay time after lookup
diff --git a/be/src/io/cache/block_file_cache.cpp
b/be/src/io/cache/block_file_cache.cpp
index b98cd6aec70..b828bbac591 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -30,6 +30,7 @@
#include <chrono> // IWYU pragma: keep
#include <mutex>
+#include <ranges>
#include "common/config.h"
#include "common/logging.h"
@@ -399,7 +400,6 @@ std::string BlockFileCache::clear_file_cache_async() {
_async_clear_file_cache = true;
}
}
- TEST_SYNC_POINT_CALLBACK("BlockFileCache::recycle_deleted_blocks");
std::stringstream ss;
ss << "finish clear_file_cache_async, path=" << _cache_base_path
<< " num_files_all=" << num_files_all << " num_cells_all=" <<
num_cells_all
@@ -410,10 +410,19 @@ std::string BlockFileCache::clear_file_cache_async() {
}
void BlockFileCache::recycle_deleted_blocks() {
+ using namespace std::chrono;
+ static int remove_batch = 100;
+ TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_remove_batch",
&remove_batch);
+ TEST_SYNC_POINT_CALLBACK("BlockFileCache::recycle_deleted_blocks");
std::unique_lock cache_lock(_mutex);
+ auto remove_file_block = [&cache_lock, this](FileBlockCell* cell) {
+ std::lock_guard segment_lock(cell->file_block->_mutex);
+ remove(cell->file_block, cache_lock, segment_lock);
+ };
+ int i = 0;
+ std::condition_variable cond;
+ auto start_time = steady_clock::time_point();
if (_async_clear_file_cache) {
- using namespace std::chrono;
- auto start_time = steady_clock::time_point();
LOG_INFO("Start clear file cache async").tag("path", _cache_base_path);
auto remove_file_block = [&cache_lock, this](FileBlockCell* cell) {
std::lock_guard segment_lock(cell->file_block->_mutex);
@@ -442,7 +451,7 @@ void BlockFileCache::recycle_deleted_blocks() {
cells.push_back(cell);
}
}
- std::for_each(cells.begin(), cells.end(), remove_file_block);
+ std::ranges::for_each(cells, remove_file_block);
// just for sleep
cond.wait_for(cache_lock, std::chrono::microseconds(100));
}
@@ -450,6 +459,8 @@ void BlockFileCache::recycle_deleted_blocks() {
iter_queue(get_queue(FileCacheType::DISPOSABLE));
iter_queue(get_queue(FileCacheType::NORMAL));
iter_queue(get_queue(FileCacheType::INDEX));
+ }
+ if (_async_clear_file_cache ||
config::file_cache_ttl_valid_check_interval_second != 0) {
std::vector<UInt128Wrapper> ttl_keys;
ttl_keys.reserve(_key_to_time.size());
for (auto& [key, _] : _key_to_time) {
@@ -465,6 +476,17 @@ void BlockFileCache::recycle_deleted_blocks() {
std::vector<FileBlockCell*> cells;
cells.reserve(iter->second.size());
for (auto& [_, cell] : iter->second) {
+ cell.is_deleted =
+ cell.is_deleted
+ ? true
+ :
(config::file_cache_ttl_valid_check_interval_second == 0
+ ? false
+ :
std::chrono::duration_cast<std::chrono::seconds>(
+
std::chrono::steady_clock::now()
+
.time_since_epoch())
+
.count() -
+ cell.atime >
+
config::file_cache_ttl_valid_check_interval_second);
if (!cell.is_deleted) {
continue;
} else if (cell.releasable()) {
@@ -472,14 +494,16 @@ void BlockFileCache::recycle_deleted_blocks() {
i++;
}
}
- std::for_each(cells.begin(), cells.end(), remove_file_block);
+ std::ranges::for_each(cells, remove_file_block);
}
}
- _async_clear_file_cache = false;
- auto use_time = duration_cast<milliseconds>(steady_clock::time_point()
- start_time);
- LOG_INFO("End clear file cache async")
- .tag("path", _cache_base_path)
- .tag("use_time", static_cast<int64_t>(use_time.count()));
+ if (_async_clear_file_cache) {
+ _async_clear_file_cache = false;
+ auto use_time =
duration_cast<milliseconds>(steady_clock::time_point() - start_time);
+ LOG_INFO("End clear file cache async")
+ .tag("path", _cache_base_path)
+ .tag("use_time", static_cast<int64_t>(use_time.count()));
+ }
}
}
@@ -1254,6 +1278,9 @@
BlockFileCache::FileBlockCell::FileBlockCell(FileBlockSPtr file_block,
DCHECK(false) << "Can create cell with either EMPTY, DOWNLOADED,
SKIP_CACHE state, got: "
<<
FileBlock::state_to_string(file_block->_download_state);
}
+ if (file_block->cache_type() == FileCacheType::TTL) {
+ update_atime();
+ }
}
BlockFileCache::LRUQueue::Iterator BlockFileCache::LRUQueue::add(
@@ -1644,6 +1671,15 @@ std::map<size_t, FileBlockSPtr>
BlockFileCache::get_blocks_by_key(const UInt128W
return offset_to_block;
}
+void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) {
+ std::lock_guard lock(_mutex);
+ if (auto iter = _files.find(hash); iter != _files.end()) {
+ for (auto& [_, cell] : iter->second) {
+ cell.update_atime();
+ }
+ };
+}
+
template void BlockFileCache::remove(FileBlockSPtr file_block,
std::lock_guard<std::mutex>& cache_lock,
std::lock_guard<std::mutex>& block_lock);
diff --git a/be/src/io/cache/block_file_cache.h
b/be/src/io/cache/block_file_cache.h
index 3efe41e6402..86ce1dc1196 100644
--- a/be/src/io/cache/block_file_cache.h
+++ b/be/src/io/cache/block_file_cache.h
@@ -131,6 +131,8 @@ public:
bool try_reserve(const UInt128Wrapper& hash, const CacheContext& context,
size_t offset,
size_t size, std::lock_guard<std::mutex>& cache_lock);
+ void update_ttl_atime(const UInt128Wrapper& hash);
+
class LRUQueue {
public:
LRUQueue() = default;
diff --git a/be/test/io/cache/block_file_cache_test.cpp
b/be/test/io/cache/block_file_cache_test.cpp
index 244f10fa754..362df33aa28 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -242,7 +242,7 @@ void test_file_cache(io::FileCacheType cache_type) {
io::BlockFileCache mgr(cache_base_path, settings);
ASSERT_TRUE(mgr.initialize().ok());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (mgr.get_lazy_open_success()) {
break;
};
@@ -580,7 +580,7 @@ void test_file_cache(io::FileCacheType cache_type) {
io::BlockFileCache cache2(cache_base_path, settings);
ASSERT_TRUE(cache2.initialize().ok());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache2.get_lazy_open_success()) {
break;
};
@@ -618,7 +618,7 @@ void test_file_cache(io::FileCacheType cache_type) {
settings2.max_file_block_size = 10;
io::BlockFileCache cache2(cache_path2, settings2);
ASSERT_TRUE(cache2.initialize().ok());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache2.get_lazy_open_success()) {
break;
};
@@ -671,7 +671,7 @@ TEST_F(BlockFileCacheTest, resize) {
settings.max_file_block_size = 100;
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -753,7 +753,7 @@ TEST_F(BlockFileCacheTest, query_limit_heap_use_after_free)
{
settings.capacity = 15;
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -838,7 +838,7 @@ TEST_F(BlockFileCacheTest, query_limit_dcheck) {
settings.capacity = 15;
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -955,7 +955,7 @@ TEST_F(BlockFileCacheTest, reset_range) {
io::BlockFileCache cache(cache_base_path, settings);
EXPECT_EQ(cache.capacity(), 15);
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -1005,7 +1005,7 @@ TEST_F(BlockFileCacheTest, change_cache_type) {
settings.capacity = 30;
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -1055,7 +1055,7 @@ TEST_F(BlockFileCacheTest, fd_cache_remove) {
settings.capacity = 15;
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -1137,7 +1137,7 @@ TEST_F(BlockFileCacheTest, fd_cache_evict) {
settings.capacity = 15;
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -1283,7 +1283,7 @@ void
test_file_cache_run_in_resource_limit(io::FileCacheType cache_type) {
io::BlockFileCache cache(cache_base_path, settings);
cache._index_queue.hot_data_interval = 0;
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -1465,7 +1465,7 @@ TEST_F(BlockFileCacheTest, test_lazy_load) {
ASSERT_TRUE(blocks[0]->append(result));
ASSERT_TRUE(blocks[0]->finalize());
flag1 = true;
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -1527,7 +1527,7 @@ TEST_F(BlockFileCacheTest, test_lazy_load_with_limit) {
ASSERT_TRUE(blocks[0]->append(result));
ASSERT_TRUE(blocks[0]->finalize());
flag1 = true;
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -1570,7 +1570,7 @@ TEST_F(BlockFileCacheTest, ttl_normal) {
auto key2 = io::BlockFileCache::hash("key6");
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -1653,7 +1653,7 @@ TEST_F(BlockFileCacheTest, ttl_modify) {
auto key2 = io::BlockFileCache::hash("key6");
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -1728,7 +1728,7 @@ TEST_F(BlockFileCacheTest, ttl_change_to_normal) {
auto key2 = io::BlockFileCache::hash("key2");
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -1792,7 +1792,7 @@ TEST_F(BlockFileCacheTest, ttl_change_expiration_time) {
auto key2 = io::BlockFileCache::hash("key2");
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -1855,7 +1855,7 @@ TEST_F(BlockFileCacheTest, ttl_reverse) {
auto key2 = io::BlockFileCache::hash("key2");
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -1907,7 +1907,7 @@ TEST_F(BlockFileCacheTest, io_error) {
auto key = io::BlockFileCache::hash("key1");
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -2063,7 +2063,7 @@ TEST_F(BlockFileCacheTest,
remove_directly_when_normal_change_to_ttl) {
auto key1 = io::BlockFileCache::hash("key1");
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -2139,7 +2139,7 @@ TEST_F(BlockFileCacheTest, recyle_cache_async) {
&guard3);
sp->enable_processing();
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -2210,7 +2210,7 @@ TEST_F(BlockFileCacheTest, recyle_cache_async_ttl) {
&guard3);
sp->enable_processing();
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -2273,7 +2273,7 @@ TEST_F(BlockFileCacheTest, remove_directly) {
context.cache_type = io::FileCacheType::TTL;
context.expiration_time = UnixSeconds() + 3600;
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -2527,7 +2527,7 @@ TEST_F(BlockFileCacheTest, test_disposable) {
auto key = io::BlockFileCache::hash("key1");
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -2638,7 +2638,7 @@ TEST_F(BlockFileCacheTest, append_many_time) {
context.cache_type = FileCacheType::NORMAL;
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -2759,7 +2759,7 @@ TEST_F(BlockFileCacheTest, query_file_cache) {
{
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -2770,7 +2770,7 @@ TEST_F(BlockFileCacheTest, query_file_cache) {
config::enable_file_cache_query_limit = true;
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -2827,7 +2827,7 @@ TEST_F(BlockFileCacheTest, query_file_cache_reserve) {
config::enable_file_cache_query_limit = true;
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -3068,7 +3068,7 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_error_handle) {
context.query_id = query_id;
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path,
settings).ok());
auto cache = FileCacheFactory::instance()->_caches[0].get();
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache->get_lazy_open_success()) {
break;
};
@@ -3339,7 +3339,7 @@ TEST_F(BlockFileCacheTest, test_hot_data) {
int64_t expiration_time = UnixSeconds() + 300;
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -3422,7 +3422,7 @@ TEST_F(BlockFileCacheTest,
test_lazy_load_with_error_file_1) {
auto key = io::BlockFileCache::hash("key1");
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -3545,7 +3545,7 @@ TEST_F(BlockFileCacheTest,
test_lazy_load_with_error_file_2) {
ASSERT_TRUE(blocks[0]->append(result));
ASSERT_TRUE(blocks[0]->finalize());
flag1 = true;
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -3571,7 +3571,7 @@ TEST_F(BlockFileCacheTest,
test_check_disk_reource_limit_1) {
config::file_cache_enter_disk_resource_limit_mode_percent =
config::file_cache_exit_disk_resource_limit_mode_percent = 50;
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -3601,7 +3601,7 @@ TEST_F(BlockFileCacheTest,
test_check_disk_reource_limit_2) {
config::file_cache_enter_disk_resource_limit_mode_percent = 2;
config::file_cache_exit_disk_resource_limit_mode_percent = 1;
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -3632,7 +3632,7 @@ TEST_F(BlockFileCacheTest,
test_check_disk_reource_limit_3) {
cache._disk_resource_limit_mode = true;
config::file_cache_exit_disk_resource_limit_mode_percent = 98;
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -3732,7 +3732,7 @@ TEST_F(BlockFileCacheTest,
remove_if_cached_when_isnt_releasable) {
auto key = io::BlockFileCache::hash("key1");
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
- while (true) {
+ for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
@@ -4032,4 +4032,69 @@ TEST_F(BlockFileCacheTest, remove_from_other_queue_2) {
}
}
+TEST_F(BlockFileCacheTest, recyle_unvalid_ttl_async) {
+ config::file_cache_ttl_valid_check_interval_second = 4;
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+ settings.query_queue_size = 30;
+ settings.query_queue_elements = 5;
+ settings.index_queue_size = 30;
+ settings.index_queue_elements = 5;
+ settings.disposable_queue_size = 30;
+ settings.disposable_queue_elements = 5;
+ settings.capacity = 90;
+ settings.max_file_block_size = 30;
+ settings.max_query_cache_size = 30;
+ io::CacheContext context;
+ context.query_id = query_id;
+ auto key = io::BlockFileCache::hash("key1");
+ io::BlockFileCache cache(cache_base_path, settings);
+ context.cache_type = io::FileCacheType::TTL;
+ context.expiration_time = UnixSeconds() + 3600;
+ auto sp = SyncPoint::get_instance();
+ Defer defer {[sp] {
+ sp->clear_call_back("BlockFileCache::set_remove_batch");
+ sp->clear_call_back("BlockFileCache::recycle_deleted_blocks");
+ sp->clear_call_back("BlockFileCache::set_sleep_time");
+ }};
+ sp->set_call_back("BlockFileCache::set_sleep_time",
+ [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 1;
});
+ sp->set_call_back("BlockFileCache::set_remove_batch",
+ [](auto&& args) { *try_any_cast<int*>(args[0]) = 2; });
+ sp->set_call_back("BlockFileCache::recycle_deleted_blocks",
+ [&](auto&&) { cache.get_or_set(key, 0, 5, context); });
+ sp->enable_processing();
+ ASSERT_TRUE(cache.initialize());
+ for (int i = 0; i < 100; i++) {
+ if (cache.get_lazy_open_success()) {
+ break;
+ };
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ for (int64_t offset = 0; offset < 60; offset += 5) {
+ auto holder = cache.get_or_set(key, offset, 5, context);
+ auto segments = fromHolder(holder);
+ ASSERT_EQ(segments.size(), 1);
+ assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(segments[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(segments[0]);
+ assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
+ io::FileBlock::State::DOWNLOADED);
+ }
+ std::this_thread::sleep_for(
+
std::chrono::seconds(config::file_cache_ttl_valid_check_interval_second + 2));
+ config::file_cache_ttl_valid_check_interval_second = 0;
+ EXPECT_EQ(cache._cur_cache_size, 5);
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+}
+
} // namespace doris::io
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]