This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 5952975b731 branch-4.0: [fix](filecache) dedup need_update_lru_block
queue to reduce memory consumption (pick#58903) (#59248)
5952975b731 is described below
commit 5952975b73198ec2a244404226dd360aa60fc9e8
Author: zhengyu <[email protected]>
AuthorDate: Tue Dec 23 14:25:52 2025 +0800
branch-4.0: [fix](filecache) dedup need_update_lru_block queue to reduce
memory consumption (pick#58903) (#59248)
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---------
Signed-off-by: freemandealer <[email protected]>
---
be/src/io/cache/block_file_cache.cpp | 114 +++++++++++++++++----
be/src/io/cache/block_file_cache.h | 47 ++++++++-
.../io/cache/block_file_cache_test_lru_dump.cpp | 23 ++---
be/test/io/cache/need_update_lru_blocks_test.cpp | 111 ++++++++++++++++++++
4 files changed, 263 insertions(+), 32 deletions(-)
diff --git a/be/src/io/cache/block_file_cache.cpp
b/be/src/io/cache/block_file_cache.cpp
index 22608c84ba9..eb26660713a 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -21,6 +21,7 @@
#include "io/cache/block_file_cache.h"
#include <cstdio>
+#include <exception>
#include <fstream>
#include "common/status.h"
@@ -55,6 +56,85 @@
namespace doris::io {
#include "common/compile_check_begin.h"
+// Insert a block pointer into one shard while swallowing allocation failures.
+bool NeedUpdateLRUBlocks::insert(FileBlockSPtr block) {
+ if (!block) {
+ return false;
+ }
+ try {
+ auto* raw_ptr = block.get();
+ auto idx = shard_index(raw_ptr);
+ auto& shard = _shards[idx];
+ std::lock_guard lock(shard.mutex);
+ auto [_, inserted] = shard.entries.emplace(raw_ptr, std::move(block));
+ if (inserted) {
+ _size.fetch_add(1, std::memory_order_relaxed);
+ }
+ return inserted;
+ } catch (const std::exception& e) {
+ LOG(WARNING) << "Failed to enqueue block for LRU update: " << e.what();
+ } catch (...) {
+ LOG(WARNING) << "Failed to enqueue block for LRU update: unknown
error";
+ }
+ return false;
+}
+
+// Drain up to `limit` unique blocks to the caller, keeping the structure
consistent on failures.
+size_t NeedUpdateLRUBlocks::drain(size_t limit, std::vector<FileBlockSPtr>*
output) {
+ if (limit == 0 || output == nullptr) {
+ return 0;
+ }
+ size_t drained = 0;
+ try {
+ output->reserve(output->size() + std::min(limit, size()));
+ for (auto& shard : _shards) {
+ if (drained >= limit) {
+ break;
+ }
+ std::lock_guard lock(shard.mutex);
+ auto it = shard.entries.begin();
+ size_t shard_drained = 0;
+ while (it != shard.entries.end() && drained + shard_drained <
limit) {
+ output->emplace_back(std::move(it->second));
+ it = shard.entries.erase(it);
+ ++shard_drained;
+ }
+ if (shard_drained > 0) {
+ _size.fetch_sub(shard_drained, std::memory_order_relaxed);
+ drained += shard_drained;
+ }
+ }
+ } catch (const std::exception& e) {
+ LOG(WARNING) << "Failed to drain LRU update blocks: " << e.what();
+ } catch (...) {
+ LOG(WARNING) << "Failed to drain LRU update blocks: unknown error";
+ }
+ return drained;
+}
+
+// Remove every pending block, guarding against unexpected exceptions.
+void NeedUpdateLRUBlocks::clear() {
+ try {
+ for (auto& shard : _shards) {
+ std::lock_guard lock(shard.mutex);
+ if (!shard.entries.empty()) {
+ auto removed = shard.entries.size();
+ shard.entries.clear();
+ _size.fetch_sub(removed, std::memory_order_relaxed);
+ }
+ }
+ } catch (const std::exception& e) {
+ LOG(WARNING) << "Failed to clear LRU update blocks: " << e.what();
+ } catch (...) {
+ LOG(WARNING) << "Failed to clear LRU update blocks: unknown error";
+ }
+}
+
+size_t NeedUpdateLRUBlocks::shard_index(FileBlock* ptr) const {
+ DCHECK(ptr != nullptr);
+ return std::hash<FileBlock*> {}(ptr)&kShardMask;
+}
+
BlockFileCache::BlockFileCache(const std::string& cache_base_path,
const FileCacheSettings& cache_settings)
: _cache_base_path(cache_base_path),
@@ -623,11 +703,8 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper&
hash, const CacheConte
}
void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) {
- bool ret = _need_update_lru_blocks.enqueue(block);
- if (ret) [[likely]] {
- *_need_update_lru_blocks_length_recorder <<
_need_update_lru_blocks.size_approx();
- } else {
- LOG_WARNING("Failed to push FileBlockSPtr to _need_update_lru_blocks");
+ if (_need_update_lru_blocks.insert(std::move(block))) {
+ *_need_update_lru_blocks_length_recorder <<
_need_update_lru_blocks.size();
}
}
@@ -2102,8 +2179,7 @@ void BlockFileCache::run_background_evict_in_advance() {
void BlockFileCache::run_background_block_lru_update() {
Thread::set_self_name("run_background_block_lru_update");
- FileBlockSPtr block;
- size_t batch_count = 0;
+ std::vector<FileBlockSPtr> batch;
while (!_close) {
int64_t interval_ms =
config::file_cache_background_block_lru_update_interval_ms;
size_t batch_limit =
@@ -2116,18 +2192,24 @@ void BlockFileCache::run_background_block_lru_update() {
}
}
+ batch.clear();
+ batch.reserve(batch_limit);
+ size_t drained = _need_update_lru_blocks.drain(batch_limit, &batch);
+ if (drained == 0) {
+ *_need_update_lru_blocks_length_recorder <<
_need_update_lru_blocks.size();
+ continue;
+ }
+
int64_t duration_ns = 0;
{
SCOPED_CACHE_LOCK(_mutex, this);
SCOPED_RAW_TIMER(&duration_ns);
- while (batch_count < batch_limit &&
_need_update_lru_blocks.try_dequeue(block)) {
+ for (auto& block : batch) {
update_block_lru(block, cache_lock);
- batch_count++;
}
}
*_update_lru_blocks_latency_us << (duration_ns / 1000);
- *_need_update_lru_blocks_length_recorder <<
_need_update_lru_blocks.size_approx();
- batch_count = 0;
+ *_need_update_lru_blocks_length_recorder <<
_need_update_lru_blocks.size();
}
}
@@ -2268,14 +2350,8 @@ bool
BlockFileCache::try_reserve_during_async_load(size_t size,
}
void BlockFileCache::clear_need_update_lru_blocks() {
- constexpr size_t kBatchSize = 1024;
- std::vector<FileBlockSPtr> buffer(kBatchSize);
- size_t drained = 0;
- while ((drained = _need_update_lru_blocks.try_dequeue_bulk(buffer.data(),
buffer.size())) > 0) {
- for (size_t i = 0; i < drained; ++i) {
- buffer[i].reset();
- }
- }
+ _need_update_lru_blocks.clear();
+ *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
}
std::string BlockFileCache::clear_file_cache_directly() {
diff --git a/be/src/io/cache/block_file_cache.h
b/be/src/io/cache/block_file_cache.h
index 14399325565..7527186c749 100644
--- a/be/src/io/cache/block_file_cache.h
+++ b/be/src/io/cache/block_file_cache.h
@@ -21,11 +21,16 @@
#include <concurrentqueue.h>
#include <algorithm>
+#include <array>
+#include <atomic>
#include <boost/lockfree/spsc_queue.hpp>
+#include <functional>
#include <memory>
#include <mutex>
#include <optional>
#include <thread>
+#include <unordered_map>
+#include <vector>
#include "io/cache/cache_lru_dumper.h"
#include "io/cache/file_block.h"
@@ -73,6 +78,46 @@ private:
class FSFileCacheStorage;
+// NeedUpdateLRUBlocks keeps FileBlockSPtr entries that require LRU updates in
a
+// deduplicated, sharded container. Entries are keyed by the raw FileBlock
+// pointer so that multiple shared_ptr copies of the same block are treated as
a
+// single pending update. The structure is thread-safe and optimized for high
+// contention insert/drain workloads in the background update thread.
+// Note that Blocks are updated in batch, internal order is not important.
+class NeedUpdateLRUBlocks {
+public:
+ NeedUpdateLRUBlocks() = default;
+
+ // Insert a block into the pending set. Returns true only when the block
+ // was not already queued. Null inputs are ignored.
+ bool insert(FileBlockSPtr block);
+
+ // Drain up to `limit` unique blocks into `output`. The method returns how
+ // many blocks were actually drained and shrinks the internal size
+ // accordingly.
+ size_t drain(size_t limit, std::vector<FileBlockSPtr>* output);
+
+ // Remove every pending block from the structure and reset the size.
+ void clear();
+
+ // Thread-safe approximate size of queued unique blocks.
+ size_t size() const { return _size.load(std::memory_order_relaxed); }
+
+private:
+ static constexpr size_t kShardCount = 64;
+ static constexpr size_t kShardMask = kShardCount - 1;
+
+ struct Shard {
+ std::mutex mutex;
+ std::unordered_map<FileBlock*, FileBlockSPtr> entries;
+ };
+
+ size_t shard_index(FileBlock* ptr) const;
+
+ std::array<Shard, kShardCount> _shards;
+ std::atomic<size_t> _size {0};
+};
+
// The BlockFileCache is responsible for the management of the blocks
// The current strategies are lru and ttl.
@@ -570,7 +615,7 @@ private:
std::unique_ptr<FileCacheStorage> _storage;
std::shared_ptr<bvar::LatencyRecorder> _lru_dump_latency_us;
std::mutex _dump_lru_queues_mtx;
- moodycamel::ConcurrentQueue<FileBlockSPtr> _need_update_lru_blocks;
+ NeedUpdateLRUBlocks _need_update_lru_blocks;
};
} // namespace doris::io
diff --git a/be/test/io/cache/block_file_cache_test_lru_dump.cpp
b/be/test/io/cache/block_file_cache_test_lru_dump.cpp
index 99c2c780bed..65762b216a0 100644
--- a/be/test/io/cache/block_file_cache_test_lru_dump.cpp
+++ b/be/test/io/cache/block_file_cache_test_lru_dump.cpp
@@ -534,13 +534,19 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_direct_read_order_check) {
// read
ASSERT_TRUE(reader->read_at(0, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx).ok());
+ // order are updated in batch, so wait the former batch complete
+ std::this_thread::sleep_for(std::chrono::milliseconds(
+ 2 * config::file_cache_background_block_lru_update_interval_ms));
ASSERT_TRUE(
reader->read_at(1024 * 1024, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx)
.ok());
+ std::this_thread::sleep_for(std::chrono::milliseconds(
+ 2 * config::file_cache_background_block_lru_update_interval_ms));
ASSERT_TRUE(reader->read_at(1024 * 1024 * 2, Slice(buffer.data(),
buffer.size()), &bytes_read,
&io_ctx)
.ok());
-
+ std::this_thread::sleep_for(std::chrono::milliseconds(
+ 2 * config::file_cache_background_block_lru_update_interval_ms));
// check inital order
std::vector<size_t> initial_offsets;
for (auto it = cache->_normal_queue.begin(); it !=
cache->_normal_queue.end(); ++it) {
@@ -555,21 +561,14 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_direct_read_order_check) {
ASSERT_TRUE(reader->read_at(1024 * 1024 * 2, Slice(buffer.data(),
buffer.size()), &bytes_read,
&io_ctx)
.ok());
+ std::this_thread::sleep_for(std::chrono::milliseconds(
+ 2 * config::file_cache_background_block_lru_update_interval_ms));
ASSERT_TRUE(
reader->read_at(1024 * 1024, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx)
.ok());
+ std::this_thread::sleep_for(std::chrono::milliseconds(
+ 2 * config::file_cache_background_block_lru_update_interval_ms));
ASSERT_TRUE(reader->read_at(0, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx).ok());
-
- std::vector<size_t> before_updated_offsets;
- for (auto it = cache->_normal_queue.begin(); it !=
cache->_normal_queue.end(); ++it) {
- before_updated_offsets.push_back(it->offset);
- }
- ASSERT_EQ(before_updated_offsets.size(), 3);
- ASSERT_EQ(before_updated_offsets[0], 0);
- ASSERT_EQ(before_updated_offsets[1], 1024 * 1024);
- ASSERT_EQ(before_updated_offsets[2], 1024 * 1024 * 2);
-
- // wait LRU update
std::this_thread::sleep_for(std::chrono::milliseconds(
2 * config::file_cache_background_block_lru_update_interval_ms));
diff --git a/be/test/io/cache/need_update_lru_blocks_test.cpp
b/be/test/io/cache/need_update_lru_blocks_test.cpp
new file mode 100644
index 00000000000..31b6eccab25
--- /dev/null
+++ b/be/test/io/cache/need_update_lru_blocks_test.cpp
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <vector>
+
+#include "io/cache/block_file_cache.h"
+
+namespace doris::io {
+namespace {
+
+FileBlockSPtr create_block(int idx) {
+ FileCacheKey key;
+ key.hash = UInt128Wrapper(vectorized::UInt128(static_cast<uint64_t>(idx +
1)));
+ key.offset = static_cast<size_t>(idx * 16);
+ key.meta.expiration_time = 0;
+ key.meta.type = FileCacheType::NORMAL;
+ return std::make_shared<FileBlock>(key, /*size*/ 1, /*mgr*/ nullptr,
FileBlock::State::EMPTY);
+}
+
+void insert_blocks(NeedUpdateLRUBlocks* pending, int count, int start_idx = 0)
{
+ for (int i = 0; i < count; ++i) {
+ ASSERT_TRUE(pending->insert(create_block(start_idx + i)))
+ << "Block " << (start_idx + i) << " should be inserted";
+ }
+}
+
+} // namespace
+
+TEST(NeedUpdateLRUBlocksTest, InsertRejectsNullAndDeduplicates) {
+ NeedUpdateLRUBlocks pending;
+ FileBlockSPtr null_block;
+ EXPECT_FALSE(pending.insert(null_block));
+ EXPECT_EQ(0, pending.size());
+
+ auto block = create_block(0);
+ EXPECT_TRUE(pending.insert(block));
+ EXPECT_EQ(1, pending.size());
+
+ EXPECT_FALSE(pending.insert(block)) << "Same pointer should not enqueue
twice";
+ EXPECT_EQ(1, pending.size());
+}
+
+TEST(NeedUpdateLRUBlocksTest, DrainHandlesZeroLimitAndNullOutput) {
+ NeedUpdateLRUBlocks pending;
+ insert_blocks(&pending, 3);
+ std::vector<FileBlockSPtr> drained;
+
+ EXPECT_EQ(0, pending.drain(0, &drained));
+ EXPECT_TRUE(drained.empty());
+ EXPECT_EQ(3, pending.size());
+
+ EXPECT_EQ(0, pending.drain(2, nullptr));
+ EXPECT_EQ(3, pending.size());
+}
+
+TEST(NeedUpdateLRUBlocksTest, DrainRespectsLimitAndLeavesRemainder) {
+ NeedUpdateLRUBlocks pending;
+ insert_blocks(&pending, 5);
+ std::vector<FileBlockSPtr> drained;
+
+ size_t drained_now = pending.drain(2, &drained);
+ EXPECT_EQ(2u, drained_now);
+ EXPECT_EQ(2u, drained.size());
+ EXPECT_EQ(3u, pending.size());
+
+ drained_now = pending.drain(10, &drained);
+ EXPECT_EQ(3u, drained_now);
+ EXPECT_EQ(5u, drained.size());
+ EXPECT_EQ(0u, pending.size());
+}
+
+TEST(NeedUpdateLRUBlocksTest, DrainFromEmptyReturnsZero) {
+ NeedUpdateLRUBlocks pending;
+ std::vector<FileBlockSPtr> drained;
+ EXPECT_EQ(0u, pending.drain(4, &drained));
+ EXPECT_TRUE(drained.empty());
+}
+
+TEST(NeedUpdateLRUBlocksTest, ClearIsIdempotent) {
+ NeedUpdateLRUBlocks pending;
+ pending.clear();
+ EXPECT_EQ(0u, pending.size());
+
+ insert_blocks(&pending, 4);
+ EXPECT_EQ(4u, pending.size());
+
+ pending.clear();
+ EXPECT_EQ(0u, pending.size());
+
+ std::vector<FileBlockSPtr> drained;
+ EXPECT_EQ(0u, pending.drain(4, &drained));
+}
+
+} // namespace doris::io
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]