github-actions[bot] commented on code in PR #63602:
URL: https://github.com/apache/doris/pull/63602#discussion_r3340630464


##########
be/src/service/http/action/file_cache_action.cpp:
##########
@@ -205,6 +219,27 @@ Status FileCacheAction::_handle_header(HttpRequest* req, 
std::string* json_metri
 }
 
 void FileCacheAction::handle(HttpRequest* req) {
+    if (_is_sync_clear_request(req)) {
+        const std::string header_json(HEADER_JSON);
+        req->add_output_header(HttpHeaders::CONTENT_TYPE, header_json.c_str());
+        req->mark_send_reply();
+        auto cancel_token = 
std::make_shared<io::BlockFileCache::ClearFileCacheCancelToken>();
+        req->set_cancel_callback(
+                [cancel_token] { cancel_token->cancelled.store(true, 
std::memory_order_release); });
+        std::thread([this, req, cancel_token] {
+            std::string json_metrics;

Review Comment:
   This introduces a new detached BE thread that runs file-cache clear work, 
but it does not initialize/attach a thread memory context. BE runtime guidance 
requires non-task background threads to enter with 
`SCOPED_INIT_THREAD_CONTEXT`; otherwise allocations performed by 
`_handle_header()`/`clear_file_caches()` in this worker are not accounted under 
the expected MemTracker context. Please initialize the thread context at the 
start of the lambda, or run this work through an existing Doris thread 
pool/entry point that already does so.



##########
be/src/io/cache/block_file_cache.cpp:
##########
@@ -679,27 +715,201 @@ std::string BlockFileCache::clear_file_cache_async() {
                 LOG(INFO) << "cell is not releasable, hash="
                           << " offset=" << cell->file_block->offset();
                 cell->file_block->set_deleting();
-                ++num_cells_wait_recycle;
+                ++result.num_cells_wait_recycle;
                 continue;
             }
             FileBlockSPtr file_block = cell->file_block;
             if (file_block) {
                 std::lock_guard block_lock(file_block->_mutex);
                 remove(file_block, cache_lock, block_lock, false);
-                ++num_cells_to_delete;
+                ++result.num_cells_to_delete;
             }
         }
         clear_need_update_lru_blocks();
     }
 
-    std::stringstream ss;
-    ss << "finish clear_file_cache_async, path=" << _cache_base_path
-       << " num_files_all=" << num_files_all << " num_cells_all=" << 
num_cells_all
-       << " num_cells_to_delete=" << num_cells_to_delete
-       << " num_cells_wait_recycle=" << num_cells_wait_recycle;
-    auto msg = ss.str();
-    LOG(INFO) << msg;
     _lru_dumper->remove_lru_dump_files();
+    return result;
+}
+
+size_t BlockFileCache::count_deleting_blocks_unlocked(
+        std::lock_guard<std::mutex>& /* cache_lock */) const {
+    size_t deleting_blocks = 0;
+    for (const auto& [_, offset_to_cell] : _files) {
+        for (const auto& [_1, cell] : offset_to_cell) {
+            if (cell.file_block->is_deleting()) {
+                ++deleting_blocks;
+            }
+        }
+    }
+    return deleting_blocks;
+}
+
+bool BlockFileCache::try_dequeue_recycle_key(FileCacheKey* key) {
+    std::lock_guard lock(_recycle_keys_mutex);
+    if (!_recycle_keys.try_dequeue(*key)) {
+        return false;
+    }
+    ++_recycle_remove_inflight;
+    return true;
+}
+
+Status BlockFileCache::remove_dequeued_recycle_key(const FileCacheKey& key) {
+    FileCacheStorageRemoveContextPtr context;
+    Status st;
+    int64_t duration_ns = 0;
+    {
+        SCOPED_RAW_TIMER(&duration_ns);
+        st = _storage->remove(key, &context);
+    }
+    *_storage_async_remove_latency_us << (duration_ns / 1000);
+    if (context) {
+        Status fence_st = context->wait();
+        if (st.ok() && !fence_st.ok()) {
+            st = fence_st;
+        }
+    }
+    {
+        std::lock_guard lock(_recycle_keys_mutex);
+        DCHECK_GT(_recycle_remove_inflight, 0);
+        --_recycle_remove_inflight;
+    }
+    _recycle_keys_cv.notify_all();
+    return st;
+}
+
+BlockFileCache::ClearFileCacheResult BlockFileCache::drain_recycle_keys(
+        const std::shared_ptr<ClearFileCacheCancelToken>& cancel_token) {
+    ClearFileCacheResult result;
+    auto is_cancelled = [&]() {
+        return cancel_token != nullptr && 
cancel_token->cancelled.load(std::memory_order_acquire);
+    };
+    FileCacheKey key;
+    while (!is_cancelled() && try_dequeue_recycle_key(&key)) {
+        Status st = remove_dequeued_recycle_key(key);
+        ++result.num_recycle_drained;
+        if (!st.ok()) {
+            LOG_WARNING("").error(st);
+            if (result.status.ok()) {
+                result.status = st;
+            }
+        }
+        if (is_cancelled()) {
+            result.cancelled = true;
+            break;
+        }
+    }
+    if (is_cancelled()) {
+        result.cancelled = true;
+    }
+    *_recycle_keys_length_recorder << _recycle_keys.size_approx();
+    return result;
+}
+
+bool BlockFileCache::recycle_keys_idle() {
+    std::unique_lock lock(_recycle_keys_mutex);
+    return _recycle_keys.size_approx() == 0 && _recycle_remove_inflight == 0;
+}
+
+void BlockFileCache::refresh_metrics_unlocked(std::lock_guard<std::mutex>& 
cache_lock) {
+    _cur_cache_size_metrics->set_value(_cur_cache_size);
+    _cur_ttl_cache_size_metrics->set_value(_cur_ttl_size);
+    
_cur_ttl_cache_lru_queue_cache_size_metrics->set_value(_ttl_queue.get_capacity(cache_lock));
+    _cur_ttl_cache_lru_queue_element_count_metrics->set_value(
+            _ttl_queue.get_elements_num(cache_lock));
+    
_cur_normal_queue_cache_size_metrics->set_value(_normal_queue.get_capacity(cache_lock));
+    
_cur_normal_queue_element_count_metrics->set_value(_normal_queue.get_elements_num(cache_lock));
+    
_cur_index_queue_cache_size_metrics->set_value(_index_queue.get_capacity(cache_lock));
+    
_cur_index_queue_element_count_metrics->set_value(_index_queue.get_elements_num(cache_lock));
+    
_cur_disposable_queue_cache_size_metrics->set_value(_disposable_queue.get_capacity(cache_lock));
+    _cur_disposable_queue_element_count_metrics->set_value(
+            _disposable_queue.get_elements_num(cache_lock));
+}
+
+std::string BlockFileCache::clear_file_cache_sync(
+        std::shared_ptr<ClearFileCacheCancelToken> cancel_token) {
+    LOG(INFO) << "start clear_file_cache_sync, path=" << _cache_base_path;
+    std::lock_guard clear_lock(_clear_mutex);
+    using namespace std::chrono;
+    auto start = steady_clock::now();
+    ClearFileCacheResult result;
+
+    class TtlManagerPauseGuard {
+    public:
+        explicit TtlManagerPauseGuard(BlockFileCache* cache) : _cache(cache) {
+            _cache->pause_ttl_manager();
+        }
+        ~TtlManagerPauseGuard() { _cache->resume_ttl_manager(); }
+
+    private:
+        BlockFileCache* _cache;
+    } ttl_pause_guard(this);
+
+    append_clear_result(result, clear_file_cache_async_impl());
+

Review Comment:
   This staged clear no longer prevents normal cache readers from creating new 
blocks while the synchronous clear is still running. 
`clear_file_cache_async_impl()` removes/marks only the entries that existed 
during its initial `_mutex` critical section; after that, 
`clear_file_cache_sync()` waits on deleting blocks and recycle keys without 
holding any state that `get_or_set()` checks. A reader can therefore insert and 
download a new block after this line but before the loop reaches 
`deleting_blocks == 0 && recycle_keys_idle()`, and the method returns 
successfully with that new block still present. The old sync path cleared 
`_files` and storage under the cache lock, giving the clear operation a 
linearization point at the end of that critical section. Please either 
block/mark new cache insertions while sync clear is in progress, or otherwise 
make the final success condition include entries created during the clear 
window.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to