gavinchou commented on code in PR #57922: URL: https://github.com/apache/doris/pull/57922#discussion_r2577488008
########## be/src/io/cache/block_file_cache_ttl_mgr.cpp: ########## @@ -0,0 +1,262 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "io/cache/block_file_cache_ttl_mgr.h" + +#include <chrono> +#include <memory> +#include <mutex> +#include <thread> +#include <unordered_set> +#include <vector> + +#include "common/config.h" +#include "common/logging.h" +#include "io/cache/block_file_cache.h" +#include "io/cache/cache_block_meta_store.h" +#include "io/cache/file_block.h" +#include "olap/base_tablet.h" +#include "runtime/exec_env.h" +#include "util/time.h" + +namespace doris::io { + +BlockFileCacheTtlMgr::BlockFileCacheTtlMgr(BlockFileCache* mgr, CacheBlockMetaStore* meta_store) + : _mgr(mgr), _meta_store(meta_store), _stop_background(false) { + // Start background threads + _update_ttl_thread = + std::thread(&BlockFileCacheTtlMgr::run_backgroud_update_ttl_info_map, this); + _expiration_check_thread = + std::thread(&BlockFileCacheTtlMgr::run_backgroud_expiration_check, this); + _tablet_id_flush_thread = + std::thread(&BlockFileCacheTtlMgr::run_background_tablet_id_flush, this); +} + +BlockFileCacheTtlMgr::~BlockFileCacheTtlMgr() { + _stop_background.store(true, std::memory_order_release); + + if (_update_ttl_thread.joinable()) { + _update_ttl_thread.join(); + } + + if (_expiration_check_thread.joinable()) { + _expiration_check_thread.join(); + } + + if (_tablet_id_flush_thread.joinable()) { + _tablet_id_flush_thread.join(); + } +} + +void BlockFileCacheTtlMgr::register_tablet_id(int64_t tablet_id) { + _tablet_id_queue.enqueue(tablet_id); +} + +void BlockFileCacheTtlMgr::run_background_tablet_id_flush() { + Thread::set_self_name("ttl_mgr_flush"); + + static constexpr size_t kBatchSize = 1024; + std::vector<int64_t> pending; + pending.reserve(kBatchSize); + + auto flush_pending = [this](std::vector<int64_t>* items) { + if (items->empty()) { + return; + } + std::lock_guard<std::mutex> lock(_tablet_id_mutex); + _tablet_id_set.insert(items->begin(), items->end()); + items->clear(); + }; + + auto drain_queue = [this, &pending, &flush_pending](bool* drained_flag) { + int64_t tablet_id = 0; + while (_tablet_id_queue.try_dequeue(tablet_id)) { + if (drained_flag != nullptr) { + *drained_flag = true; + } + pending.push_back(tablet_id); + if (pending.size() >= kBatchSize) { + flush_pending(&pending); + } + } + }; + + while (!_stop_background.load(std::memory_order_acquire)) { + bool drained = false; + drain_queue(&drained); + flush_pending(&pending); + + if (!drained) { + std::this_thread::sleep_for(std::chrono::milliseconds( + config::file_cache_background_tablet_id_flush_interval_ms)); + } + } + + // Drain remaining items before exit + drain_queue(nullptr); + flush_pending(&pending); +} + +FileBlocks BlockFileCacheTtlMgr::get_file_blocks_from_tablet_id(int64_t tablet_id) { + FileBlocks result; + + // Use meta store to get all blocks for this tablet + auto iterator = _meta_store->range_get(tablet_id); + if (!iterator) { + LOG(WARNING) << "Failed to get iterator for tablet_id: " << tablet_id; + return result; + } + + while (iterator->valid()) { + BlockMetaKey key = iterator->key(); + + // Get all blocks for this hash using get_blocks_by_key + try { + auto blocks_map = _mgr->get_blocks_by_key(key.hash); + for (const auto& [offset, block] : blocks_map) { + // Only add blocks that match our specific offset + if (offset == key.offset) { + result.push_back(block); + break; + } + } + } catch (const std::exception& e) { + LOG(WARNING) << "Failed to get file blocks for tablet_id: " << tablet_id + << ", hash: " << key.hash.to_string() << ", error: " << e.what(); + } + + iterator->next(); + } + + return result; +} + +void BlockFileCacheTtlMgr::run_backgroud_update_ttl_info_map() { + Thread::set_self_name("ttl_mgr_update"); + + while (!_stop_background.load(std::memory_order_acquire)) { + try { + std::unordered_set<int64_t> tablet_ids_to_process; + { + std::lock_guard<std::mutex> lock(_tablet_id_mutex); + tablet_ids_to_process = _tablet_id_set; + } + + for (int64_t tablet_id : tablet_ids_to_process) { Review Comment: we should check `_stop_background` every tablet_id iteration for quick exit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
