This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new cf1714c52b7 [enhancement](cloud) optimize block cache lock (#41818)
cf1714c52b7 is described below
commit cf1714c52b7fbe8fd1eb47770c0bbbfefd1071c4
Author: zhengyu <[email protected]>
AuthorDate: Wed Oct 30 17:48:04 2024 +0800
[enhancement](cloud) optimize block cache lock (#41818)
1. async deletion when do stale rowsets reclycle
2. minimize lock critical size
3. add cache lock held & wait time info for debug
---
be/CMakeLists.txt | 4 +
be/src/cloud/cloud_tablet.cpp | 2 +-
be/src/common/config.cpp | 3 +
be/src/common/config.h | 3 +
be/src/io/cache/block_file_cache.cpp | 108 ++++++++++++++++++++-------
be/src/io/cache/block_file_cache.h | 48 +++++++++++-
be/src/io/cache/block_file_cache_profile.cpp | 6 +-
be/src/io/cache/file_block.cpp | 32 +++++---
be/src/io/cache/fs_file_cache_storage.cpp | 3 +-
build.sh | 6 ++
10 files changed, 172 insertions(+), 43 deletions(-)
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 25722baf076..1d79048f965 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -342,6 +342,10 @@ if (ENABLE_INJECTION_POINT)
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DENABLE_INJECTION_POINT")
endif()
+if (ENABLE_CACHE_LOCK_DEBUG)
+ set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DENABLE_CACHE_LOCK_DEBUG")
+endif()
+
# Enable memory tracker, which allows BE to limit the memory of tasks such as
query, load,
# and compaction,and observe the memory of BE through
be_ip:http_port/MemTracker.
# Adding the option `USE_MEM_TRACKER=OFF sh build.sh` when compiling can turn
off the memory tracker,
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index c046259b0da..b944db87030 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -433,7 +433,7 @@ void CloudTablet::recycle_cached_data(const
std::vector<RowsetSharedPtr>& rowset
// TODO: Segment::file_cache_key
auto file_key =
Segment::file_cache_key(rs->rowset_id().to_string(), seg_id);
auto* file_cache =
io::FileCacheFactory::instance()->get_by_path(file_key);
- file_cache->remove_if_cached(file_key);
+ file_cache->remove_if_cached_async(file_key);
}
}
}
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 663e6da08b4..995de99819a 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -925,6 +925,9 @@ DEFINE_mBool(enable_query_like_bloom_filter, "true");
DEFINE_Int32(doris_remote_scanner_thread_pool_thread_num, "48");
// number of s3 scanner thread pool queue size
DEFINE_Int32(doris_remote_scanner_thread_pool_queue_size, "102400");
+DEFINE_mInt64(block_cache_wait_timeout_ms, "1000");
+DEFINE_mInt64(cache_lock_long_tail_threshold, "1000");
+DEFINE_Int64(file_cache_recycle_keys_size, "1000000");
// limit the queue of pending batches which will be sent by a single
nodechannel
DEFINE_mInt64(nodechannel_pending_queue_max_bytes, "67108864");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 585c4dc45cc..77ef46e2b6f 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -984,6 +984,9 @@ DECLARE_mInt64(nodechannel_pending_queue_max_bytes);
// The batch size for sending data by brpc streaming client
DECLARE_mInt64(brpc_streaming_client_batch_bytes);
+DECLARE_mInt64(block_cache_wait_timeout_ms);
+DECLARE_mInt64(cache_lock_long_tail_threshold);
+DECLARE_Int64(file_cache_recycle_keys_size);
DECLARE_Bool(enable_brpc_builtin_services);
diff --git a/be/src/io/cache/block_file_cache.cpp
b/be/src/io/cache/block_file_cache.cpp
index 814e84e8e21..f2f1f223652 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -119,6 +119,8 @@ BlockFileCache::BlockFileCache(const std::string&
cache_base_path,
_ttl_queue = LRUQueue(std::numeric_limits<int>::max(),
std::numeric_limits<int>::max(),
std::numeric_limits<int>::max());
+ _recycle_keys =
std::make_shared<boost::lockfree::spsc_queue<FileCacheKey>>(
+ config::file_cache_recycle_keys_size);
if (cache_settings.storage == "memory") {
_storage = std::make_unique<MemFileCacheStorage>();
_cache_base_path = "memory";
@@ -163,8 +165,7 @@ FileCacheType BlockFileCache::string_to_cache_type(const
std::string& str) {
BlockFileCache::QueryFileCacheContextHolderPtr
BlockFileCache::get_query_context_holder(
const TUniqueId& query_id) {
- std::lock_guard cache_lock(_mutex);
-
+ SCOPED_CACHE_LOCK(_mutex);
if (!config::enable_file_cache_query_limit) {
return {};
}
@@ -182,7 +183,7 @@ BlockFileCache::QueryFileCacheContextPtr
BlockFileCache::get_query_context(
}
void BlockFileCache::remove_query_context(const TUniqueId& query_id) {
- std::lock_guard cache_lock(_mutex);
+ SCOPED_CACHE_LOCK(_mutex);
const auto& query_iter = _query_map.find(query_id);
if (query_iter != _query_map.end() && query_iter->second.use_count() <= 1)
{
@@ -227,7 +228,7 @@ void BlockFileCache::QueryFileCacheContext::reserve(const
UInt128Wrapper& hash,
}
Status BlockFileCache::initialize() {
- std::lock_guard cache_lock(_mutex);
+ SCOPED_CACHE_LOCK(_mutex);
return initialize_unlocked(cache_lock);
}
@@ -438,7 +439,7 @@ std::string BlockFileCache::clear_file_cache_async() {
int64_t num_cells_to_delete = 0;
int64_t num_files_all = 0;
{
- std::lock_guard cache_lock(_mutex);
+ SCOPED_CACHE_LOCK(_mutex);
if (!_async_clear_file_cache) {
for (auto& [_, offset_to_cell] : _files) {
++num_files_all;
@@ -674,7 +675,7 @@ FileBlocksHolder BlockFileCache::get_or_set(const
UInt128Wrapper& hash, size_t o
CacheContext& context) {
FileBlock::Range range(offset, offset + size - 1);
- std::lock_guard cache_lock(_mutex);
+ SCOPED_CACHE_LOCK(_mutex);
if (auto iter = _key_to_time.find(hash);
context.cache_type == FileCacheType::INDEX && iter !=
_key_to_time.end()) {
context.cache_type = FileCacheType::TTL;
@@ -751,7 +752,7 @@ BlockFileCache::FileBlockCell*
BlockFileCache::add_cell(const UInt128Wrapper& ha
}
size_t BlockFileCache::try_release() {
- std::lock_guard l(_mutex);
+ SCOPED_CACHE_LOCK(_mutex);
std::vector<FileBlockCell*> trash;
for (auto& [hash, blocks] : _files) {
for (auto& [offset, cell] : blocks) {
@@ -763,7 +764,7 @@ size_t BlockFileCache::try_release() {
for (auto& cell : trash) {
FileBlockSPtr file_block = cell->file_block;
std::lock_guard lc(cell->file_block->_mutex);
- remove(file_block, l, lc);
+ remove(file_block, cache_lock, lc);
}
LOG(INFO) << "Released " << trash.size() << " blocks in file cache " <<
_cache_base_path;
return trash.size();
@@ -813,6 +814,18 @@ void
BlockFileCache::remove_file_blocks(std::vector<FileBlockCell*>& to_evict,
std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
}
+void BlockFileCache::remove_file_blocks_async(std::vector<FileBlockCell*>&
to_evict,
+ std::lock_guard<std::mutex>&
cache_lock) {
+ auto remove_file_block_if = [&](FileBlockCell* cell) {
+ FileBlockSPtr file_block = cell->file_block;
+ if (file_block) {
+ std::lock_guard block_lock(file_block->_mutex);
+ remove(file_block, cache_lock, block_lock, /*sync*/ false);
+ }
+ };
+ std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
+}
+
void BlockFileCache::remove_file_blocks_and_clean_time_maps(
std::vector<FileBlockCell*>& to_evict, std::lock_guard<std::mutex>&
cache_lock) {
auto remove_file_block_and_clean_time_maps_if = [&](FileBlockCell* cell) {
@@ -1098,7 +1111,7 @@ bool BlockFileCache::remove_if_ttl_file_unlock(const
UInt128Wrapper& file_key, b
}
void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) {
- std::lock_guard cache_lock(_mutex);
+ SCOPED_CACHE_LOCK(_mutex);
bool is_ttl_file = remove_if_ttl_file_unlock(file_key, true, cache_lock);
if (!is_ttl_file) {
auto iter = _files.find(file_key);
@@ -1114,6 +1127,23 @@ void BlockFileCache::remove_if_cached(const
UInt128Wrapper& file_key) {
}
}
+void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) {
+ SCOPED_CACHE_LOCK(_mutex);
+ bool is_ttl_file = remove_if_ttl_file_unlock(file_key, true, cache_lock);
+ if (!is_ttl_file) {
+ auto iter = _files.find(file_key);
+ std::vector<FileBlockCell*> to_remove;
+ if (iter != _files.end()) {
+ for (auto& [_, cell] : iter->second) {
+ if (cell.releasable()) {
+ to_remove.push_back(&cell);
+ }
+ }
+ }
+ remove_file_blocks_async(to_remove, cache_lock);
+ }
+}
+
std::vector<FileCacheType> BlockFileCache::get_other_cache_type(FileCacheType
cur_cache_type) {
switch (cur_cache_type) {
case FileCacheType::INDEX:
@@ -1264,7 +1294,7 @@ bool BlockFileCache::try_reserve_for_lru(const
UInt128Wrapper& hash,
template <class T, class U>
requires IsXLock<T> && IsXLock<U>
-void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U&
block_lock) {
+void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U&
block_lock, bool sync) {
auto hash = file_block->get_hash_value();
auto offset = file_block->offset();
auto type = file_block->cache_type();
@@ -1284,9 +1314,24 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T&
cache_lock, U& block_lo
key.offset = offset;
key.meta.type = type;
key.meta.expiration_time = expiration_time;
- Status st = _storage->remove(key);
- if (!st.ok()) {
- LOG_WARNING("").error(st);
+ if (sync) {
+ Status st = _storage->remove(key);
+ if (!st.ok()) {
+ LOG_WARNING("").error(st);
+ }
+ } else {
+ // the file will be deleted in the bottom half
+ // so there will be a window that the file is not in the cache but
still in the storage
+ // but it's ok, because the rowset is stale already
+ // in case something unexpected happen, set the _recycle_keys
queue to zero to fallback
+ bool ret = _recycle_keys->push(key);
+ if (!ret) {
+ LOG_WARNING("Failed to push recycle key to queue, do it
synchronously");
+ Status st = _storage->remove(key);
+ if (!st.ok()) {
+ LOG_WARNING("").error(st);
+ }
+ }
}
}
_cur_cache_size -= file_block->range().size();
@@ -1301,8 +1346,18 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T&
cache_lock, U& block_lo
*_num_removed_blocks << 1;
}
+void BlockFileCache::recycle_stale_rowset_async_bottom_half() {
+ FileCacheKey key;
+ while (_recycle_keys->pop(key)) {
+ Status st = _storage->remove(key);
+ if (!st.ok()) {
+ LOG_WARNING("").error(st);
+ }
+ }
+}
+
size_t BlockFileCache::get_used_cache_size(FileCacheType cache_type) const {
- std::lock_guard cache_lock(_mutex);
+ SCOPED_CACHE_LOCK(_mutex);
return get_used_cache_size_unlocked(cache_type, cache_lock);
}
@@ -1312,7 +1367,7 @@ size_t
BlockFileCache::get_used_cache_size_unlocked(FileCacheType cache_type,
}
size_t BlockFileCache::get_available_cache_size(FileCacheType cache_type)
const {
- std::lock_guard cache_lock(_mutex);
+ SCOPED_CACHE_LOCK(_mutex);
return get_available_cache_size_unlocked(cache_type, cache_lock);
}
@@ -1323,7 +1378,7 @@ size_t BlockFileCache::get_available_cache_size_unlocked(
}
size_t BlockFileCache::get_file_blocks_num(FileCacheType cache_type) const {
- std::lock_guard cache_lock(_mutex);
+ SCOPED_CACHE_LOCK(_mutex);
return get_file_blocks_num_unlocked(cache_type, cache_lock);
}
@@ -1407,7 +1462,7 @@ std::string BlockFileCache::LRUQueue::to_string(
}
std::string BlockFileCache::dump_structure(const UInt128Wrapper& hash) {
- std::lock_guard cache_lock(_mutex);
+ SCOPED_CACHE_LOCK(_mutex);
return dump_structure_unlocked(hash, cache_lock);
}
@@ -1425,7 +1480,7 @@ std::string BlockFileCache::dump_structure_unlocked(const
UInt128Wrapper& hash,
}
std::string BlockFileCache::dump_single_cache_type(const UInt128Wrapper& hash,
size_t offset) {
- std::lock_guard cache_lock(_mutex);
+ SCOPED_CACHE_LOCK(_mutex);
return dump_single_cache_type_unlocked(hash, offset, cache_lock);
}
@@ -1488,7 +1543,7 @@ std::string BlockFileCache::reset_capacity(size_t
new_capacity) {
ss << "finish reset_capacity, path=" << _cache_base_path;
auto start_time = steady_clock::time_point();
{
- std::lock_guard cache_lock(_mutex);
+ SCOPED_CACHE_LOCK(_mutex);
if (new_capacity < _capacity && new_capacity < _cur_cache_size) {
int64_t need_remove_size = _cur_cache_size - new_capacity;
auto remove_blocks = [&](LRUQueue& queue) -> int64_t {
@@ -1597,10 +1652,11 @@ void BlockFileCache::run_background_operation() {
break;
}
}
+ recycle_stale_rowset_async_bottom_half();
recycle_deleted_blocks();
// gc
int64_t cur_time = UnixSeconds();
- std::lock_guard cache_lock(_mutex);
+ SCOPED_CACHE_LOCK(_mutex);
while (!_time_to_key.empty()) {
auto begin = _time_to_key.begin();
if (cur_time < begin->first) {
@@ -1646,7 +1702,7 @@ void BlockFileCache::run_background_operation() {
void BlockFileCache::modify_expiration_time(const UInt128Wrapper& hash,
uint64_t new_expiration_time) {
- std::lock_guard cache_lock(_mutex);
+ SCOPED_CACHE_LOCK(_mutex);
// 1. If new_expiration_time is equal to zero
if (new_expiration_time == 0) {
remove_if_ttl_file_unlock(hash, false, cache_lock);
@@ -1711,7 +1767,7 @@ BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper&
hash) const {
int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
- std::lock_guard cache_lock(_mutex);
+ SCOPED_CACHE_LOCK(_mutex);
std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>>
blocks_meta;
if (auto iter = _files.find(hash); iter != _files.end()) {
for (auto& pair : _files.find(hash)->second) {
@@ -1780,7 +1836,7 @@ std::string BlockFileCache::clear_file_cache_directly() {
using namespace std::chrono;
std::stringstream ss;
auto start = steady_clock::now();
- std::lock_guard cache_lock(_mutex);
+ SCOPED_CACHE_LOCK(_mutex);
LOG_INFO("start clear_file_cache_directly").tag("path", _cache_base_path);
std::string clear_msg;
@@ -1818,7 +1874,7 @@ std::string BlockFileCache::clear_file_cache_directly() {
std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const
UInt128Wrapper& hash) {
std::map<size_t, FileBlockSPtr> offset_to_block;
- std::lock_guard cache_lock(_mutex);
+ SCOPED_CACHE_LOCK(_mutex);
if (_files.contains(hash)) {
for (auto& [offset, cell] : _files[hash]) {
if (cell.file_block->state() == FileBlock::State::DOWNLOADED) {
@@ -1833,7 +1889,7 @@ std::map<size_t, FileBlockSPtr>
BlockFileCache::get_blocks_by_key(const UInt128W
}
void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) {
- std::lock_guard lock(_mutex);
+ SCOPED_CACHE_LOCK(_mutex);
if (auto iter = _files.find(hash); iter != _files.end()) {
for (auto& [_, cell] : iter->second) {
cell.update_atime();
@@ -1871,5 +1927,5 @@ std::map<std::string, double> BlockFileCache::get_stats()
{
template void BlockFileCache::remove(FileBlockSPtr file_block,
std::lock_guard<std::mutex>& cache_lock,
- std::lock_guard<std::mutex>& block_lock);
+ std::lock_guard<std::mutex>& block_lock,
bool sync);
} // namespace doris::io
diff --git a/be/src/io/cache/block_file_cache.h
b/be/src/io/cache/block_file_cache.h
index 2845994bd60..4bedc725692 100644
--- a/be/src/io/cache/block_file_cache.h
+++ b/be/src/io/cache/block_file_cache.h
@@ -19,6 +19,7 @@
#include <bvar/bvar.h>
+#include <boost/lockfree/spsc_queue.hpp>
#include <memory>
#include <mutex>
#include <optional>
@@ -27,15 +28,51 @@
#include "io/cache/file_block.h"
#include "io/cache/file_cache_common.h"
#include "io/cache/file_cache_storage.h"
+#include "util/threadpool.h"
namespace doris::io {
+// Note: the cache_lock is scoped, so do not add do...while(0) here.
+#ifdef ENABLE_CACHE_LOCK_DEBUG
+#define SCOPED_CACHE_LOCK(MUTEX)
\
+ std::chrono::time_point<std::chrono::steady_clock> start_time =
\
+ std::chrono::steady_clock::now();
\
+ std::lock_guard cache_lock(MUTEX);
\
+ std::chrono::time_point<std::chrono::steady_clock> acq_time =
\
+ std::chrono::steady_clock::now();
\
+ auto duration =
\
+ std::chrono::duration_cast<std::chrono::milliseconds>(acq_time -
start_time).count(); \
+ if (duration > config::cache_lock_long_tail_threshold)
\
+ LOG(WARNING) << "Lock wait time " << std::to_string(duration) << "ms.
" \
+ << get_stack_trace_by_boost() << std::endl;
\
+ LockScopedTimer cache_lock_timer;
+#else
+#define SCOPED_CACHE_LOCK(MUTEX) std::lock_guard cache_lock(MUTEX);
+#endif
+
template <class Lock>
concept IsXLock = std::same_as<Lock, std::lock_guard<std::mutex>> ||
std::same_as<Lock, std::unique_lock<std::mutex>>;
class FSFileCacheStorage;
+class LockScopedTimer {
+public:
+ LockScopedTimer() : start_(std::chrono::steady_clock::now()) {}
+
+ ~LockScopedTimer() {
+ auto end = std::chrono::steady_clock::now();
+ auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(end - start_).count();
+ if (duration > 500) {
+ LOG(WARNING) << "Lock held time " << std::to_string(duration) <<
"ms. "
+ << get_stack_trace_by_boost();
+ }
+ }
+
+private:
+ std::chrono::time_point<std::chrono::steady_clock> start_;
+};
+
// The BlockFileCache is responsible for the management of the blocks
// The current strategies are lru and ttl.
class BlockFileCache {
@@ -119,6 +156,7 @@ public:
// remove all blocks that belong to the key
void remove_if_cached(const UInt128Wrapper& key);
+ void remove_if_cached_async(const UInt128Wrapper& key);
// modify the expiration time about the key
void modify_expiration_time(const UInt128Wrapper& key, uint64_t
new_expiration_time);
@@ -320,7 +358,7 @@ private:
template <class T, class U>
requires IsXLock<T> && IsXLock<U>
- void remove(FileBlockSPtr file_block, T& cache_lock, U& segment_lock);
+ void remove(FileBlockSPtr file_block, T& cache_lock, U& segment_lock, bool
sync = true);
FileBlocks get_impl(const UInt128Wrapper& hash, const CacheContext&
context,
const FileBlock::Range& range,
std::lock_guard<std::mutex>& cache_lock);
@@ -402,12 +440,17 @@ private:
void remove_file_blocks(std::vector<FileBlockCell*>&,
std::lock_guard<std::mutex>&);
+ void remove_file_blocks_async(std::vector<FileBlockCell*>&,
std::lock_guard<std::mutex>&);
+
void remove_file_blocks_and_clean_time_maps(std::vector<FileBlockCell*>&,
std::lock_guard<std::mutex>&);
void find_evict_candidates(LRUQueue& queue, size_t size, size_t
cur_cache_size,
size_t& removed_size,
std::vector<FileBlockCell*>& to_evict,
std::lock_guard<std::mutex>& cache_lock, bool
is_ttl);
+
+ void recycle_stale_rowset_async_bottom_half();
+
// info
std::string _cache_base_path;
size_t _capacity = 0;
@@ -446,6 +489,9 @@ private:
LRUQueue _disposable_queue;
LRUQueue _ttl_queue;
+ // keys for async remove
+ std::shared_ptr<boost::lockfree::spsc_queue<FileCacheKey>> _recycle_keys;
+
// metrics
std::shared_ptr<bvar::Status<size_t>> _cache_capacity_metrics;
std::shared_ptr<bvar::Status<size_t>> _cur_cache_size_metrics;
diff --git a/be/src/io/cache/block_file_cache_profile.cpp
b/be/src/io/cache/block_file_cache_profile.cpp
index 68e6c1433de..1759d37f9e4 100644
--- a/be/src/io/cache/block_file_cache_profile.cpp
+++ b/be/src/io/cache/block_file_cache_profile.cpp
@@ -34,9 +34,9 @@ std::shared_ptr<AtomicStatistics> FileCacheProfile::report() {
}
void FileCacheProfile::update(FileCacheStatistics* stats) {
- {
- std::lock_guard lock(_mtx);
- if (!_profile) {
+ if (_profile == nullptr) {
+ std::lock_guard<std::mutex> lock(_mtx);
+ if (_profile == nullptr) {
_profile = std::make_shared<AtomicStatistics>();
_file_cache_metric = std::make_shared<FileCacheMetric>(this);
_file_cache_metric->register_entity();
diff --git a/be/src/io/cache/file_block.cpp b/be/src/io/cache/file_block.cpp
index b015cbd6111..4576b9dbba8 100644
--- a/be/src/io/cache/file_block.cpp
+++ b/be/src/io/cache/file_block.cpp
@@ -144,7 +144,7 @@ Status FileBlock::append(Slice data) {
Status FileBlock::finalize() {
if (_downloaded_size != 0 && _downloaded_size != _block_range.size()) {
- std::lock_guard cache_lock(_mgr->_mutex);
+ SCOPED_CACHE_LOCK(_mgr->_mutex);
size_t old_size = _block_range.size();
_block_range.right = _block_range.left + _downloaded_size - 1;
size_t new_size = _block_range.size();
@@ -179,7 +179,7 @@ Status
FileBlock::change_cache_type_between_ttl_and_others(FileCacheType new_typ
}
Status FileBlock::change_cache_type_between_normal_and_index(FileCacheType
new_type) {
- std::lock_guard cache_lock(_mgr->_mutex);
+ SCOPED_CACHE_LOCK(_mgr->_mutex);
std::lock_guard block_lock(_mutex);
bool expr = (new_type != FileCacheType::TTL && _key.meta.type !=
FileCacheType::TTL);
if (!expr) {
@@ -223,7 +223,7 @@ FileBlock::State FileBlock::wait() {
if (_download_state == State::DOWNLOADING) {
DCHECK(_downloader_id != 0 && _downloader_id != get_caller_id());
- _cv.wait_for(block_lock, std::chrono::seconds(1));
+ _cv.wait_for(block_lock,
std::chrono::milliseconds(config::block_cache_wait_timeout_ms));
}
return _download_state;
@@ -278,14 +278,24 @@ FileBlocksHolder::~FileBlocksHolder() {
auto& file_block = *current_file_block_it;
BlockFileCache* _mgr = file_block->_mgr;
{
- std::lock_guard cache_lock(_mgr->_mutex);
- std::lock_guard block_lock(file_block->_mutex);
- file_block->complete_unlocked(block_lock);
- if (file_block.use_count() == 2) {
- DCHECK(file_block->state_unlock(block_lock) !=
FileBlock::State::DOWNLOADING);
- // one in cache, one in here
- if (file_block->state_unlock(block_lock) ==
FileBlock::State::EMPTY) {
- _mgr->remove(file_block, cache_lock, block_lock);
+ bool should_remove = false;
+ {
+ std::lock_guard block_lock(file_block->_mutex);
+ file_block->complete_unlocked(block_lock);
+ if (file_block.use_count() == 2 &&
+ file_block->state_unlock(block_lock) ==
FileBlock::State::EMPTY) {
+ should_remove = true;
+ }
+ }
+ if (should_remove) {
+ SCOPED_CACHE_LOCK(_mgr->_mutex);
+ std::lock_guard block_lock(file_block->_mutex);
+ if (file_block.use_count() == 2) {
+ DCHECK(file_block->state_unlock(block_lock) !=
FileBlock::State::DOWNLOADING);
+ // one in cache, one in here
+ if (file_block->state_unlock(block_lock) ==
FileBlock::State::EMPTY) {
+ _mgr->remove(file_block, cache_lock, block_lock);
+ }
}
}
}
diff --git a/be/src/io/cache/fs_file_cache_storage.cpp
b/be/src/io/cache/fs_file_cache_storage.cpp
index ecdf04c8830..bacd0820c66 100644
--- a/be/src/io/cache/fs_file_cache_storage.cpp
+++ b/be/src/io/cache/fs_file_cache_storage.cpp
@@ -471,7 +471,8 @@ void
FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) const
std::vector<BatchLoadArgs> batch_load_buffer;
batch_load_buffer.reserve(scan_length);
auto add_cell_batch_func = [&]() {
- std::lock_guard cache_lock(_mgr->_mutex);
+ SCOPED_CACHE_LOCK(_mgr->_mutex);
+
auto f = [&](const BatchLoadArgs& args) {
// in async load mode, a cell may be added twice.
if (_mgr->_files.contains(args.hash) &&
_mgr->_files[args.hash].contains(args.offset)) {
diff --git a/build.sh b/build.sh
index 8311335cfab..35c989d0b0a 100755
--- a/build.sh
+++ b/build.sh
@@ -442,6 +442,10 @@ if [[ -z "${ENABLE_INJECTION_POINT}" ]]; then
ENABLE_INJECTION_POINT='OFF'
fi
+if [[ -z "${ENABLE_CACHE_LOCK_DEBUG}" ]]; then
+ ENABLE_CACHE_LOCK_DEBUG='OFF'
+fi
+
if [[ -z "${RECORD_COMPILER_SWITCHES}" ]]; then
RECORD_COMPILER_SWITCHES='OFF'
fi
@@ -488,6 +492,7 @@ echo "Get params:
USE_JEMALLOC -- ${USE_JEMALLOC}
USE_BTHREAD_SCANNER -- ${USE_BTHREAD_SCANNER}
ENABLE_INJECTION_POINT -- ${ENABLE_INJECTION_POINT}
+ ENABLE_CACHE_LOCK_DEBUG -- ${ENABLE_CACHE_LOCK_DEBUG}
DENABLE_CLANG_COVERAGE -- ${DENABLE_CLANG_COVERAGE}
DISPLAY_BUILD_TIME -- ${DISPLAY_BUILD_TIME}
ENABLE_PCH -- ${ENABLE_PCH}
@@ -574,6 +579,7 @@ if [[ "${BUILD_BE}" -eq 1 ]]; then
-DCMAKE_EXPORT_COMPILE_COMMANDS=ON \
-DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" \
-DENABLE_INJECTION_POINT="${ENABLE_INJECTION_POINT}" \
+ -DENABLE_CACHE_LOCK_DEBUG="${ENABLE_CACHE_LOCK_DEBUG}" \
-DMAKE_TEST=OFF \
-DBUILD_FS_BENCHMARK="${BUILD_FS_BENCHMARK}" \
${CMAKE_USE_CCACHE:+${CMAKE_USE_CCACHE}} \
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]