This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d330f7ba504 (cloud-merge)  Support to validate TTL keys (#34142)
d330f7ba504 is described below

commit d330f7ba504106e55358b8403a39020d4a95cae5
Author: Lightman <[email protected]>
AuthorDate: Wed May 1 09:10:28 2024 +0800

    (cloud-merge)  Support to validate TTL keys (#34142)
    
    
    
    Co-authored-by: Yongqiang YANG 
<[email protected]>
---
 be/src/cloud/cloud_storage_engine.cpp      |  35 ++++++++
 be/src/cloud/cloud_storage_engine.h        |   2 +
 be/src/common/config.cpp                   |   1 +
 be/src/common/config.h                     |   1 +
 be/src/io/cache/block_file_cache.cpp       |  56 +++++++++---
 be/src/io/cache/block_file_cache.h         |   2 +
 be/test/io/cache/block_file_cache_test.cpp | 137 +++++++++++++++++++++--------
 7 files changed, 188 insertions(+), 46 deletions(-)

diff --git a/be/src/cloud/cloud_storage_engine.cpp 
b/be/src/cloud/cloud_storage_engine.cpp
index 269ce4f231c..40e8258ec75 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -24,6 +24,7 @@
 #include <rapidjson/prettywriter.h>
 #include <rapidjson/stringbuffer.h>
 
+#include <algorithm>
 #include <variant>
 
 #include "cloud/cloud_base_compaction.h"
@@ -37,6 +38,7 @@
 #include "cloud/cloud_warm_up_manager.h"
 #include "cloud/config.h"
 #include "io/cache/block_file_cache_downloader.h"
+#include "io/cache/file_cache_common.h"
 #include "io/fs/file_system.h"
 #include "io/fs/hdfs_file_system.h"
 #include "io/fs/s3_file_system.h"
@@ -274,11 +276,44 @@ Status CloudStorageEngine::start_bg_threads() {
     RETURN_IF_ERROR(Thread::create(
             "StorageEngine", "lease_compaction_thread",
             [this]() { this->_lease_compaction_thread_callback(); }, 
&_bg_threads.emplace_back()));
+
+    if (config::file_cache_ttl_valid_check_interval_second != 0) {
+        RETURN_IF_ERROR(Thread::create(
+                "StorageEngine", "check_file_cache_ttl_block_valid_thread",
+                [this]() { this->_check_file_cache_ttl_block_valid(); },
+                &_bg_threads.emplace_back()));
+        LOG(INFO) << "check file cache ttl block valid thread started";
+    }
+
     LOG(INFO) << "lease compaction thread started";
 
     return Status::OK();
 }
 
+void CloudStorageEngine::_check_file_cache_ttl_block_valid() {
+    int64_t interval_seconds = 
config::file_cache_ttl_valid_check_interval_second / 2;
+    auto check_ttl = [](const std::weak_ptr<CloudTablet>& tablet_wk) {
+        auto tablet = tablet_wk.lock();
+        if (!tablet) return;
+        if (tablet->tablet_meta()->ttl_seconds() == 0) return;
+        auto rowsets = tablet->get_snapshot_rowset();
+        for (const auto& rowset : rowsets) {
+            int64_t ttl_seconds = tablet->tablet_meta()->ttl_seconds();
+            if (rowset->newest_write_timestamp() + ttl_seconds <= 
UnixSeconds()) continue;
+            for (int64_t seg_id = 0; seg_id < rowset->num_segments(); 
seg_id++) {
+                auto seg_path = rowset->segment_file_path(seg_id);
+                auto hash = 
io::BlockFileCache::hash(io::Path(seg_path).filename().native());
+                auto* file_cache = 
io::FileCacheFactory::instance()->get_by_path(hash);
+                file_cache->update_ttl_atime(hash);
+            }
+        }
+    };
+    while 
(!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval_seconds)))
 {
+        auto weak_tablets = tablet_mgr().get_weak_tablets();
+        std::for_each(weak_tablets.begin(), weak_tablets.end(), check_ttl);
+    }
+}
+
 void CloudStorageEngine::sync_storage_vault() {
     cloud::StorageVaultInfos vault_infos;
     auto st = _meta_mgr->get_storage_vault_info(&vault_infos);
diff --git a/be/src/cloud/cloud_storage_engine.h 
b/be/src/cloud/cloud_storage_engine.h
index 6311fa21993..e482a5cff61 100644
--- a/be/src/cloud/cloud_storage_engine.h
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -25,6 +25,7 @@
 #include "cloud/cloud_cumulative_compaction_policy.h"
 #include "cloud/cloud_tablet.h"
 #include "cloud_txn_delete_bitmap_cache.h"
+#include "io/cache/block_file_cache_factory.h"
 #include "olap/storage_engine.h"
 #include "olap/storage_policy.h"
 #include "util/threadpool.h"
@@ -71,6 +72,7 @@ public:
     ThreadPool& calc_tablet_delete_bitmap_task_thread_pool() const {
         return *_calc_tablet_delete_bitmap_task_thread_pool;
     }
+    void _check_file_cache_ttl_block_valid();
 
     io::FileSystemSPtr get_fs_by_vault_id(const std::string& vault_id) const {
         if (vault_id.empty()) {
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index e5686adf76c..b0612958d35 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -990,6 +990,7 @@ 
DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "90");
 DEFINE_mInt32(file_cache_exit_disk_resource_limit_mode_percent, "80");
 DEFINE_mBool(enable_read_cache_file_directly, "false");
 DEFINE_mBool(file_cache_enable_evict_from_other_queue_by_size, "false");
+DEFINE_mInt64(file_cache_ttl_valid_check_interval_second, "0"); // zero for 
not checking
 
 DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800");
 DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 0e4300c3c97..f2cdc35ed07 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1034,6 +1034,7 @@ 
DECLARE_Int32(file_cache_enter_disk_resource_limit_mode_percent);
 DECLARE_Int32(file_cache_exit_disk_resource_limit_mode_percent);
 DECLARE_mBool(enable_read_cache_file_directly);
 DECLARE_Bool(file_cache_enable_evict_from_other_queue_by_size);
+DECLARE_mInt64(file_cache_ttl_valid_check_interval_second);
 
 // inverted index searcher cache
 // cache entry stay time after lookup
diff --git a/be/src/io/cache/block_file_cache.cpp 
b/be/src/io/cache/block_file_cache.cpp
index b98cd6aec70..b828bbac591 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -30,6 +30,7 @@
 
 #include <chrono> // IWYU pragma: keep
 #include <mutex>
+#include <ranges>
 
 #include "common/config.h"
 #include "common/logging.h"
@@ -399,7 +400,6 @@ std::string BlockFileCache::clear_file_cache_async() {
             _async_clear_file_cache = true;
         }
     }
-    TEST_SYNC_POINT_CALLBACK("BlockFileCache::recycle_deleted_blocks");
     std::stringstream ss;
     ss << "finish clear_file_cache_async, path=" << _cache_base_path
        << " num_files_all=" << num_files_all << " num_cells_all=" << 
num_cells_all
@@ -410,10 +410,19 @@ std::string BlockFileCache::clear_file_cache_async() {
 }
 
 void BlockFileCache::recycle_deleted_blocks() {
+    using namespace std::chrono;
+    static int remove_batch = 100;
+    TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_remove_batch", 
&remove_batch);
+    TEST_SYNC_POINT_CALLBACK("BlockFileCache::recycle_deleted_blocks");
     std::unique_lock cache_lock(_mutex);
+    auto remove_file_block = [&cache_lock, this](FileBlockCell* cell) {
+        std::lock_guard segment_lock(cell->file_block->_mutex);
+        remove(cell->file_block, cache_lock, segment_lock);
+    };
+    int i = 0;
+    std::condition_variable cond;
+    auto start_time = steady_clock::time_point();
     if (_async_clear_file_cache) {
-        using namespace std::chrono;
-        auto start_time = steady_clock::time_point();
         LOG_INFO("Start clear file cache async").tag("path", _cache_base_path);
         auto remove_file_block = [&cache_lock, this](FileBlockCell* cell) {
             std::lock_guard segment_lock(cell->file_block->_mutex);
@@ -442,7 +451,7 @@ void BlockFileCache::recycle_deleted_blocks() {
                         cells.push_back(cell);
                     }
                 }
-                std::for_each(cells.begin(), cells.end(), remove_file_block);
+                std::ranges::for_each(cells, remove_file_block);
                 // just for sleep
                 cond.wait_for(cache_lock, std::chrono::microseconds(100));
             }
@@ -450,6 +459,8 @@ void BlockFileCache::recycle_deleted_blocks() {
         iter_queue(get_queue(FileCacheType::DISPOSABLE));
         iter_queue(get_queue(FileCacheType::NORMAL));
         iter_queue(get_queue(FileCacheType::INDEX));
+    }
+    if (_async_clear_file_cache || 
config::file_cache_ttl_valid_check_interval_second != 0) {
         std::vector<UInt128Wrapper> ttl_keys;
         ttl_keys.reserve(_key_to_time.size());
         for (auto& [key, _] : _key_to_time) {
@@ -465,6 +476,17 @@ void BlockFileCache::recycle_deleted_blocks() {
                 std::vector<FileBlockCell*> cells;
                 cells.reserve(iter->second.size());
                 for (auto& [_, cell] : iter->second) {
+                    cell.is_deleted =
+                            cell.is_deleted
+                                    ? true
+                                    : 
(config::file_cache_ttl_valid_check_interval_second == 0
+                                               ? false
+                                               : 
std::chrono::duration_cast<std::chrono::seconds>(
+                                                         
std::chrono::steady_clock::now()
+                                                                 
.time_since_epoch())
+                                                                         
.count() -
+                                                                 cell.atime >
+                                                         
config::file_cache_ttl_valid_check_interval_second);
                     if (!cell.is_deleted) {
                         continue;
                     } else if (cell.releasable()) {
@@ -472,14 +494,16 @@ void BlockFileCache::recycle_deleted_blocks() {
                         i++;
                     }
                 }
-                std::for_each(cells.begin(), cells.end(), remove_file_block);
+                std::ranges::for_each(cells, remove_file_block);
             }
         }
-        _async_clear_file_cache = false;
-        auto use_time = duration_cast<milliseconds>(steady_clock::time_point() 
- start_time);
-        LOG_INFO("End clear file cache async")
-                .tag("path", _cache_base_path)
-                .tag("use_time", static_cast<int64_t>(use_time.count()));
+        if (_async_clear_file_cache) {
+            _async_clear_file_cache = false;
+            auto use_time = 
duration_cast<milliseconds>(steady_clock::time_point() - start_time);
+            LOG_INFO("End clear file cache async")
+                    .tag("path", _cache_base_path)
+                    .tag("use_time", static_cast<int64_t>(use_time.count()));
+        }
     }
 }
 
@@ -1254,6 +1278,9 @@ 
BlockFileCache::FileBlockCell::FileBlockCell(FileBlockSPtr file_block,
         DCHECK(false) << "Can create cell with either EMPTY, DOWNLOADED, 
SKIP_CACHE state, got: "
                       << 
FileBlock::state_to_string(file_block->_download_state);
     }
+    if (file_block->cache_type() == FileCacheType::TTL) {
+        update_atime();
+    }
 }
 
 BlockFileCache::LRUQueue::Iterator BlockFileCache::LRUQueue::add(
@@ -1644,6 +1671,15 @@ std::map<size_t, FileBlockSPtr> 
BlockFileCache::get_blocks_by_key(const UInt128W
     return offset_to_block;
 }
 
+void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) {
+    std::lock_guard lock(_mutex);
+    if (auto iter = _files.find(hash); iter != _files.end()) {
+        for (auto& [_, cell] : iter->second) {
+            cell.update_atime();
+        }
+    };
+}
+
 template void BlockFileCache::remove(FileBlockSPtr file_block,
                                      std::lock_guard<std::mutex>& cache_lock,
                                      std::lock_guard<std::mutex>& block_lock);
diff --git a/be/src/io/cache/block_file_cache.h 
b/be/src/io/cache/block_file_cache.h
index 3efe41e6402..86ce1dc1196 100644
--- a/be/src/io/cache/block_file_cache.h
+++ b/be/src/io/cache/block_file_cache.h
@@ -131,6 +131,8 @@ public:
     bool try_reserve(const UInt128Wrapper& hash, const CacheContext& context, 
size_t offset,
                      size_t size, std::lock_guard<std::mutex>& cache_lock);
 
+    void update_ttl_atime(const UInt128Wrapper& hash);
+
     class LRUQueue {
     public:
         LRUQueue() = default;
diff --git a/be/test/io/cache/block_file_cache_test.cpp 
b/be/test/io/cache/block_file_cache_test.cpp
index 244f10fa754..362df33aa28 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -242,7 +242,7 @@ void test_file_cache(io::FileCacheType cache_type) {
         io::BlockFileCache mgr(cache_base_path, settings);
         ASSERT_TRUE(mgr.initialize().ok());
 
-        while (true) {
+        for (int i = 0; i < 100; i++) {
             if (mgr.get_lazy_open_success()) {
                 break;
             };
@@ -580,7 +580,7 @@ void test_file_cache(io::FileCacheType cache_type) {
 
         io::BlockFileCache cache2(cache_base_path, settings);
         ASSERT_TRUE(cache2.initialize().ok());
-        while (true) {
+        for (int i = 0; i < 100; i++) {
             if (cache2.get_lazy_open_success()) {
                 break;
             };
@@ -618,7 +618,7 @@ void test_file_cache(io::FileCacheType cache_type) {
         settings2.max_file_block_size = 10;
         io::BlockFileCache cache2(cache_path2, settings2);
         ASSERT_TRUE(cache2.initialize().ok());
-        while (true) {
+        for (int i = 0; i < 100; i++) {
             if (cache2.get_lazy_open_success()) {
                 break;
             };
@@ -671,7 +671,7 @@ TEST_F(BlockFileCacheTest, resize) {
     settings.max_file_block_size = 100;
     io::BlockFileCache cache(cache_base_path, settings);
     ASSERT_TRUE(cache.initialize());
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -753,7 +753,7 @@ TEST_F(BlockFileCacheTest, query_limit_heap_use_after_free) 
{
     settings.capacity = 15;
     io::BlockFileCache cache(cache_base_path, settings);
     ASSERT_TRUE(cache.initialize());
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -838,7 +838,7 @@ TEST_F(BlockFileCacheTest, query_limit_dcheck) {
     settings.capacity = 15;
     io::BlockFileCache cache(cache_base_path, settings);
     ASSERT_TRUE(cache.initialize());
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -955,7 +955,7 @@ TEST_F(BlockFileCacheTest, reset_range) {
     io::BlockFileCache cache(cache_base_path, settings);
     EXPECT_EQ(cache.capacity(), 15);
     ASSERT_TRUE(cache.initialize());
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -1005,7 +1005,7 @@ TEST_F(BlockFileCacheTest, change_cache_type) {
     settings.capacity = 30;
     io::BlockFileCache cache(cache_base_path, settings);
     ASSERT_TRUE(cache.initialize());
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -1055,7 +1055,7 @@ TEST_F(BlockFileCacheTest, fd_cache_remove) {
     settings.capacity = 15;
     io::BlockFileCache cache(cache_base_path, settings);
     ASSERT_TRUE(cache.initialize());
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -1137,7 +1137,7 @@ TEST_F(BlockFileCacheTest, fd_cache_evict) {
     settings.capacity = 15;
     io::BlockFileCache cache(cache_base_path, settings);
     ASSERT_TRUE(cache.initialize());
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -1283,7 +1283,7 @@ void 
test_file_cache_run_in_resource_limit(io::FileCacheType cache_type) {
         io::BlockFileCache cache(cache_base_path, settings);
         cache._index_queue.hot_data_interval = 0;
         ASSERT_TRUE(cache.initialize());
-        while (true) {
+        for (int i = 0; i < 100; i++) {
             if (cache.get_lazy_open_success()) {
                 break;
             };
@@ -1465,7 +1465,7 @@ TEST_F(BlockFileCacheTest, test_lazy_load) {
     ASSERT_TRUE(blocks[0]->append(result));
     ASSERT_TRUE(blocks[0]->finalize());
     flag1 = true;
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -1527,7 +1527,7 @@ TEST_F(BlockFileCacheTest, test_lazy_load_with_limit) {
     ASSERT_TRUE(blocks[0]->append(result));
     ASSERT_TRUE(blocks[0]->finalize());
     flag1 = true;
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -1570,7 +1570,7 @@ TEST_F(BlockFileCacheTest, ttl_normal) {
     auto key2 = io::BlockFileCache::hash("key6");
     io::BlockFileCache cache(cache_base_path, settings);
     ASSERT_TRUE(cache.initialize());
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -1653,7 +1653,7 @@ TEST_F(BlockFileCacheTest, ttl_modify) {
     auto key2 = io::BlockFileCache::hash("key6");
     io::BlockFileCache cache(cache_base_path, settings);
     ASSERT_TRUE(cache.initialize());
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -1728,7 +1728,7 @@ TEST_F(BlockFileCacheTest, ttl_change_to_normal) {
     auto key2 = io::BlockFileCache::hash("key2");
     io::BlockFileCache cache(cache_base_path, settings);
     ASSERT_TRUE(cache.initialize());
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -1792,7 +1792,7 @@ TEST_F(BlockFileCacheTest, ttl_change_expiration_time) {
     auto key2 = io::BlockFileCache::hash("key2");
     io::BlockFileCache cache(cache_base_path, settings);
     ASSERT_TRUE(cache.initialize());
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -1855,7 +1855,7 @@ TEST_F(BlockFileCacheTest, ttl_reverse) {
     auto key2 = io::BlockFileCache::hash("key2");
     io::BlockFileCache cache(cache_base_path, settings);
     ASSERT_TRUE(cache.initialize());
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -1907,7 +1907,7 @@ TEST_F(BlockFileCacheTest, io_error) {
     auto key = io::BlockFileCache::hash("key1");
     io::BlockFileCache cache(cache_base_path, settings);
     ASSERT_TRUE(cache.initialize());
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -2063,7 +2063,7 @@ TEST_F(BlockFileCacheTest, 
remove_directly_when_normal_change_to_ttl) {
     auto key1 = io::BlockFileCache::hash("key1");
     io::BlockFileCache cache(cache_base_path, settings);
     ASSERT_TRUE(cache.initialize());
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -2139,7 +2139,7 @@ TEST_F(BlockFileCacheTest, recyle_cache_async) {
             &guard3);
     sp->enable_processing();
     ASSERT_TRUE(cache.initialize());
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -2210,7 +2210,7 @@ TEST_F(BlockFileCacheTest, recyle_cache_async_ttl) {
             &guard3);
     sp->enable_processing();
     ASSERT_TRUE(cache.initialize());
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -2273,7 +2273,7 @@ TEST_F(BlockFileCacheTest, remove_directly) {
     context.cache_type = io::FileCacheType::TTL;
     context.expiration_time = UnixSeconds() + 3600;
     ASSERT_TRUE(cache.initialize());
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -2527,7 +2527,7 @@ TEST_F(BlockFileCacheTest, test_disposable) {
     auto key = io::BlockFileCache::hash("key1");
     io::BlockFileCache cache(cache_base_path, settings);
     ASSERT_TRUE(cache.initialize());
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -2638,7 +2638,7 @@ TEST_F(BlockFileCacheTest, append_many_time) {
     context.cache_type = FileCacheType::NORMAL;
     io::BlockFileCache cache(cache_base_path, settings);
     ASSERT_TRUE(cache.initialize());
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -2759,7 +2759,7 @@ TEST_F(BlockFileCacheTest, query_file_cache) {
     {
         io::BlockFileCache cache(cache_base_path, settings);
         ASSERT_TRUE(cache.initialize());
-        while (true) {
+        for (int i = 0; i < 100; i++) {
             if (cache.get_lazy_open_success()) {
                 break;
             };
@@ -2770,7 +2770,7 @@ TEST_F(BlockFileCacheTest, query_file_cache) {
     config::enable_file_cache_query_limit = true;
     io::BlockFileCache cache(cache_base_path, settings);
     ASSERT_TRUE(cache.initialize());
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -2827,7 +2827,7 @@ TEST_F(BlockFileCacheTest, query_file_cache_reserve) {
     config::enable_file_cache_query_limit = true;
     io::BlockFileCache cache(cache_base_path, settings);
     ASSERT_TRUE(cache.initialize());
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -3068,7 +3068,7 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_error_handle) {
     context.query_id = query_id;
     
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path, 
settings).ok());
     auto cache = FileCacheFactory::instance()->_caches[0].get();
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache->get_lazy_open_success()) {
             break;
         };
@@ -3339,7 +3339,7 @@ TEST_F(BlockFileCacheTest, test_hot_data) {
     int64_t expiration_time = UnixSeconds() + 300;
     io::BlockFileCache cache(cache_base_path, settings);
     ASSERT_TRUE(cache.initialize());
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -3422,7 +3422,7 @@ TEST_F(BlockFileCacheTest, 
test_lazy_load_with_error_file_1) {
     auto key = io::BlockFileCache::hash("key1");
     io::BlockFileCache cache(cache_base_path, settings);
     ASSERT_TRUE(cache.initialize());
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -3545,7 +3545,7 @@ TEST_F(BlockFileCacheTest, 
test_lazy_load_with_error_file_2) {
     ASSERT_TRUE(blocks[0]->append(result));
     ASSERT_TRUE(blocks[0]->finalize());
     flag1 = true;
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -3571,7 +3571,7 @@ TEST_F(BlockFileCacheTest, 
test_check_disk_reource_limit_1) {
     config::file_cache_enter_disk_resource_limit_mode_percent =
             config::file_cache_exit_disk_resource_limit_mode_percent = 50;
     ASSERT_TRUE(cache.initialize());
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -3601,7 +3601,7 @@ TEST_F(BlockFileCacheTest, 
test_check_disk_reource_limit_2) {
     config::file_cache_enter_disk_resource_limit_mode_percent = 2;
     config::file_cache_exit_disk_resource_limit_mode_percent = 1;
     ASSERT_TRUE(cache.initialize());
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -3632,7 +3632,7 @@ TEST_F(BlockFileCacheTest, 
test_check_disk_reource_limit_3) {
     cache._disk_resource_limit_mode = true;
     config::file_cache_exit_disk_resource_limit_mode_percent = 98;
     ASSERT_TRUE(cache.initialize());
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -3732,7 +3732,7 @@ TEST_F(BlockFileCacheTest, 
remove_if_cached_when_isnt_releasable) {
     auto key = io::BlockFileCache::hash("key1");
     io::BlockFileCache cache(cache_base_path, settings);
     ASSERT_TRUE(cache.initialize());
-    while (true) {
+    for (int i = 0; i < 100; i++) {
         if (cache.get_lazy_open_success()) {
             break;
         };
@@ -4032,4 +4032,69 @@ TEST_F(BlockFileCacheTest, remove_from_other_queue_2) {
     }
 }
 
+TEST_F(BlockFileCacheTest, recyle_unvalid_ttl_async) {
+    config::file_cache_ttl_valid_check_interval_second = 4;
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    fs::create_directories(cache_base_path);
+    TUniqueId query_id;
+    query_id.hi = 1;
+    query_id.lo = 1;
+    io::FileCacheSettings settings;
+    settings.query_queue_size = 30;
+    settings.query_queue_elements = 5;
+    settings.index_queue_size = 30;
+    settings.index_queue_elements = 5;
+    settings.disposable_queue_size = 30;
+    settings.disposable_queue_elements = 5;
+    settings.capacity = 90;
+    settings.max_file_block_size = 30;
+    settings.max_query_cache_size = 30;
+    io::CacheContext context;
+    context.query_id = query_id;
+    auto key = io::BlockFileCache::hash("key1");
+    io::BlockFileCache cache(cache_base_path, settings);
+    context.cache_type = io::FileCacheType::TTL;
+    context.expiration_time = UnixSeconds() + 3600;
+    auto sp = SyncPoint::get_instance();
+    Defer defer {[sp] {
+        sp->clear_call_back("BlockFileCache::set_remove_batch");
+        sp->clear_call_back("BlockFileCache::recycle_deleted_blocks");
+        sp->clear_call_back("BlockFileCache::set_sleep_time");
+    }};
+    sp->set_call_back("BlockFileCache::set_sleep_time",
+                      [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 1; 
});
+    sp->set_call_back("BlockFileCache::set_remove_batch",
+                      [](auto&& args) { *try_any_cast<int*>(args[0]) = 2; });
+    sp->set_call_back("BlockFileCache::recycle_deleted_blocks",
+                      [&](auto&&) { cache.get_or_set(key, 0, 5, context); });
+    sp->enable_processing();
+    ASSERT_TRUE(cache.initialize());
+    for (int i = 0; i < 100; i++) {
+        if (cache.get_lazy_open_success()) {
+            break;
+        };
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    }
+    for (int64_t offset = 0; offset < 60; offset += 5) {
+        auto holder = cache.get_or_set(key, offset, 5, context);
+        auto segments = fromHolder(holder);
+        ASSERT_EQ(segments.size(), 1);
+        assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(segments[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(segments[0]);
+        assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
+                     io::FileBlock::State::DOWNLOADED);
+    }
+    std::this_thread::sleep_for(
+            
std::chrono::seconds(config::file_cache_ttl_valid_check_interval_second + 2));
+    config::file_cache_ttl_valid_check_interval_second = 0;
+    EXPECT_EQ(cache._cur_cache_size, 5);
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+}
+
 } // namespace doris::io


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to