This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 5952975b731 branch-4.0: [fix](filecache) dedup need_update_lru_block 
queue to reduce memory consumption (pick#58903) (#59248)
5952975b731 is described below

commit 5952975b73198ec2a244404226dd360aa60fc9e8
Author: zhengyu <[email protected]>
AuthorDate: Tue Dec 23 14:25:52 2025 +0800

    branch-4.0: [fix](filecache) dedup need_update_lru_block queue to reduce 
memory consumption (pick#58903) (#59248)
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
    
    ---------
    
    Signed-off-by: freemandealer <[email protected]>
---
 be/src/io/cache/block_file_cache.cpp               | 114 +++++++++++++++++----
 be/src/io/cache/block_file_cache.h                 |  47 ++++++++-
 .../io/cache/block_file_cache_test_lru_dump.cpp    |  23 ++---
 be/test/io/cache/need_update_lru_blocks_test.cpp   | 111 ++++++++++++++++++++
 4 files changed, 263 insertions(+), 32 deletions(-)

diff --git a/be/src/io/cache/block_file_cache.cpp 
b/be/src/io/cache/block_file_cache.cpp
index 22608c84ba9..eb26660713a 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -21,6 +21,7 @@
 #include "io/cache/block_file_cache.h"
 
 #include <cstdio>
+#include <exception>
 #include <fstream>
 
 #include "common/status.h"
@@ -55,6 +56,85 @@
 namespace doris::io {
 #include "common/compile_check_begin.h"
 
+// Insert a block pointer into one shard while swallowing allocation failures.
+bool NeedUpdateLRUBlocks::insert(FileBlockSPtr block) {
+    if (!block) {
+        return false;
+    }
+    try {
+        auto* raw_ptr = block.get();
+        auto idx = shard_index(raw_ptr);
+        auto& shard = _shards[idx];
+        std::lock_guard lock(shard.mutex);
+        auto [_, inserted] = shard.entries.emplace(raw_ptr, std::move(block));
+        if (inserted) {
+            _size.fetch_add(1, std::memory_order_relaxed);
+        }
+        return inserted;
+    } catch (const std::exception& e) {
+        LOG(WARNING) << "Failed to enqueue block for LRU update: " << e.what();
+    } catch (...) {
+        LOG(WARNING) << "Failed to enqueue block for LRU update: unknown 
error";
+    }
+    return false;
+}
+
+// Drain up to `limit` unique blocks to the caller, keeping the structure 
consistent on failures.
+size_t NeedUpdateLRUBlocks::drain(size_t limit, std::vector<FileBlockSPtr>* 
output) {
+    if (limit == 0 || output == nullptr) {
+        return 0;
+    }
+    size_t drained = 0;
+    try {
+        output->reserve(output->size() + std::min(limit, size()));
+        for (auto& shard : _shards) {
+            if (drained >= limit) {
+                break;
+            }
+            std::lock_guard lock(shard.mutex);
+            auto it = shard.entries.begin();
+            size_t shard_drained = 0;
+            while (it != shard.entries.end() && drained + shard_drained < 
limit) {
+                output->emplace_back(std::move(it->second));
+                it = shard.entries.erase(it);
+                ++shard_drained;
+            }
+            if (shard_drained > 0) {
+                _size.fetch_sub(shard_drained, std::memory_order_relaxed);
+                drained += shard_drained;
+            }
+        }
+    } catch (const std::exception& e) {
+        LOG(WARNING) << "Failed to drain LRU update blocks: " << e.what();
+    } catch (...) {
+        LOG(WARNING) << "Failed to drain LRU update blocks: unknown error";
+    }
+    return drained;
+}
+
+// Remove every pending block, guarding against unexpected exceptions.
+void NeedUpdateLRUBlocks::clear() {
+    try {
+        for (auto& shard : _shards) {
+            std::lock_guard lock(shard.mutex);
+            if (!shard.entries.empty()) {
+                auto removed = shard.entries.size();
+                shard.entries.clear();
+                _size.fetch_sub(removed, std::memory_order_relaxed);
+            }
+        }
+    } catch (const std::exception& e) {
+        LOG(WARNING) << "Failed to clear LRU update blocks: " << e.what();
+    } catch (...) {
+        LOG(WARNING) << "Failed to clear LRU update blocks: unknown error";
+    }
+}
+
+size_t NeedUpdateLRUBlocks::shard_index(FileBlock* ptr) const {
+    DCHECK(ptr != nullptr);
+    return std::hash<FileBlock*> {}(ptr)&kShardMask;
+}
+
 BlockFileCache::BlockFileCache(const std::string& cache_base_path,
                                const FileCacheSettings& cache_settings)
         : _cache_base_path(cache_base_path),
@@ -623,11 +703,8 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& 
hash, const CacheConte
 }
 
 void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) {
-    bool ret = _need_update_lru_blocks.enqueue(block);
-    if (ret) [[likely]] {
-        *_need_update_lru_blocks_length_recorder << 
_need_update_lru_blocks.size_approx();
-    } else {
-        LOG_WARNING("Failed to push FileBlockSPtr to _need_update_lru_blocks");
+    if (_need_update_lru_blocks.insert(std::move(block))) {
+        *_need_update_lru_blocks_length_recorder << 
_need_update_lru_blocks.size();
     }
 }
 
@@ -2102,8 +2179,7 @@ void BlockFileCache::run_background_evict_in_advance() {
 
 void BlockFileCache::run_background_block_lru_update() {
     Thread::set_self_name("run_background_block_lru_update");
-    FileBlockSPtr block;
-    size_t batch_count = 0;
+    std::vector<FileBlockSPtr> batch;
     while (!_close) {
         int64_t interval_ms = 
config::file_cache_background_block_lru_update_interval_ms;
         size_t batch_limit =
@@ -2116,18 +2192,24 @@ void BlockFileCache::run_background_block_lru_update() {
             }
         }
 
+        batch.clear();
+        batch.reserve(batch_limit);
+        size_t drained = _need_update_lru_blocks.drain(batch_limit, &batch);
+        if (drained == 0) {
+            *_need_update_lru_blocks_length_recorder << 
_need_update_lru_blocks.size();
+            continue;
+        }
+
         int64_t duration_ns = 0;
         {
             SCOPED_CACHE_LOCK(_mutex, this);
             SCOPED_RAW_TIMER(&duration_ns);
-            while (batch_count < batch_limit && 
_need_update_lru_blocks.try_dequeue(block)) {
+            for (auto& block : batch) {
                 update_block_lru(block, cache_lock);
-                batch_count++;
             }
         }
         *_update_lru_blocks_latency_us << (duration_ns / 1000);
-        *_need_update_lru_blocks_length_recorder << 
_need_update_lru_blocks.size_approx();
-        batch_count = 0;
+        *_need_update_lru_blocks_length_recorder << 
_need_update_lru_blocks.size();
     }
 }
 
@@ -2268,14 +2350,8 @@ bool 
BlockFileCache::try_reserve_during_async_load(size_t size,
 }
 
 void BlockFileCache::clear_need_update_lru_blocks() {
-    constexpr size_t kBatchSize = 1024;
-    std::vector<FileBlockSPtr> buffer(kBatchSize);
-    size_t drained = 0;
-    while ((drained = _need_update_lru_blocks.try_dequeue_bulk(buffer.data(), 
buffer.size())) > 0) {
-        for (size_t i = 0; i < drained; ++i) {
-            buffer[i].reset();
-        }
-    }
+    _need_update_lru_blocks.clear();
+    *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
 }
 
 std::string BlockFileCache::clear_file_cache_directly() {
diff --git a/be/src/io/cache/block_file_cache.h 
b/be/src/io/cache/block_file_cache.h
index 14399325565..7527186c749 100644
--- a/be/src/io/cache/block_file_cache.h
+++ b/be/src/io/cache/block_file_cache.h
@@ -21,11 +21,16 @@
 #include <concurrentqueue.h>
 
 #include <algorithm>
+#include <array>
+#include <atomic>
 #include <boost/lockfree/spsc_queue.hpp>
+#include <functional>
 #include <memory>
 #include <mutex>
 #include <optional>
 #include <thread>
+#include <unordered_map>
+#include <vector>
 
 #include "io/cache/cache_lru_dumper.h"
 #include "io/cache/file_block.h"
@@ -73,6 +78,46 @@ private:
 
 class FSFileCacheStorage;
 
+// NeedUpdateLRUBlocks keeps FileBlockSPtr entries that require LRU updates in 
a
+// deduplicated, sharded container. Entries are keyed by the raw FileBlock
+// pointer so that multiple shared_ptr copies of the same block are treated as 
a
+// single pending update. The structure is thread-safe and optimized for high
+// contention insert/drain workloads in the background update thread.
+// Note that Blocks are updated in batch, internal order is not important.
+class NeedUpdateLRUBlocks {
+public:
+    NeedUpdateLRUBlocks() = default;
+
+    // Insert a block into the pending set. Returns true only when the block
+    // was not already queued. Null inputs are ignored.
+    bool insert(FileBlockSPtr block);
+
+    // Drain up to `limit` unique blocks into `output`. The method returns how
+    // many blocks were actually drained and shrinks the internal size
+    // accordingly.
+    size_t drain(size_t limit, std::vector<FileBlockSPtr>* output);
+
+    // Remove every pending block from the structure and reset the size.
+    void clear();
+
+    // Thread-safe approximate size of queued unique blocks.
+    size_t size() const { return _size.load(std::memory_order_relaxed); }
+
+private:
+    static constexpr size_t kShardCount = 64;
+    static constexpr size_t kShardMask = kShardCount - 1;
+
+    struct Shard {
+        std::mutex mutex;
+        std::unordered_map<FileBlock*, FileBlockSPtr> entries;
+    };
+
+    size_t shard_index(FileBlock* ptr) const;
+
+    std::array<Shard, kShardCount> _shards;
+    std::atomic<size_t> _size {0};
+};
+
 // The BlockFileCache is responsible for the management of the blocks
 // The current strategies are lru and ttl.
 
@@ -570,7 +615,7 @@ private:
     std::unique_ptr<FileCacheStorage> _storage;
     std::shared_ptr<bvar::LatencyRecorder> _lru_dump_latency_us;
     std::mutex _dump_lru_queues_mtx;
-    moodycamel::ConcurrentQueue<FileBlockSPtr> _need_update_lru_blocks;
+    NeedUpdateLRUBlocks _need_update_lru_blocks;
 };
 
 } // namespace doris::io
diff --git a/be/test/io/cache/block_file_cache_test_lru_dump.cpp 
b/be/test/io/cache/block_file_cache_test_lru_dump.cpp
index 99c2c780bed..65762b216a0 100644
--- a/be/test/io/cache/block_file_cache_test_lru_dump.cpp
+++ b/be/test/io/cache/block_file_cache_test_lru_dump.cpp
@@ -534,13 +534,19 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_direct_read_order_check) {
 
     // read
     ASSERT_TRUE(reader->read_at(0, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx).ok());
+    // order are updated in batch, so wait the former batch complete
+    std::this_thread::sleep_for(std::chrono::milliseconds(
+            2 * config::file_cache_background_block_lru_update_interval_ms));
     ASSERT_TRUE(
             reader->read_at(1024 * 1024, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx)
                     .ok());
+    std::this_thread::sleep_for(std::chrono::milliseconds(
+            2 * config::file_cache_background_block_lru_update_interval_ms));
     ASSERT_TRUE(reader->read_at(1024 * 1024 * 2, Slice(buffer.data(), 
buffer.size()), &bytes_read,
                                 &io_ctx)
                         .ok());
-
+    std::this_thread::sleep_for(std::chrono::milliseconds(
+            2 * config::file_cache_background_block_lru_update_interval_ms));
     // check inital order
     std::vector<size_t> initial_offsets;
     for (auto it = cache->_normal_queue.begin(); it != 
cache->_normal_queue.end(); ++it) {
@@ -555,21 +561,14 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_direct_read_order_check) {
     ASSERT_TRUE(reader->read_at(1024 * 1024 * 2, Slice(buffer.data(), 
buffer.size()), &bytes_read,
                                 &io_ctx)
                         .ok());
+    std::this_thread::sleep_for(std::chrono::milliseconds(
+            2 * config::file_cache_background_block_lru_update_interval_ms));
     ASSERT_TRUE(
             reader->read_at(1024 * 1024, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx)
                     .ok());
+    std::this_thread::sleep_for(std::chrono::milliseconds(
+            2 * config::file_cache_background_block_lru_update_interval_ms));
     ASSERT_TRUE(reader->read_at(0, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx).ok());
-
-    std::vector<size_t> before_updated_offsets;
-    for (auto it = cache->_normal_queue.begin(); it != 
cache->_normal_queue.end(); ++it) {
-        before_updated_offsets.push_back(it->offset);
-    }
-    ASSERT_EQ(before_updated_offsets.size(), 3);
-    ASSERT_EQ(before_updated_offsets[0], 0);
-    ASSERT_EQ(before_updated_offsets[1], 1024 * 1024);
-    ASSERT_EQ(before_updated_offsets[2], 1024 * 1024 * 2);
-
-    // wait LRU update
     std::this_thread::sleep_for(std::chrono::milliseconds(
             2 * config::file_cache_background_block_lru_update_interval_ms));
 
diff --git a/be/test/io/cache/need_update_lru_blocks_test.cpp 
b/be/test/io/cache/need_update_lru_blocks_test.cpp
new file mode 100644
index 00000000000..31b6eccab25
--- /dev/null
+++ b/be/test/io/cache/need_update_lru_blocks_test.cpp
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <vector>
+
+#include "io/cache/block_file_cache.h"
+
+namespace doris::io {
+namespace {
+
+FileBlockSPtr create_block(int idx) {
+    FileCacheKey key;
+    key.hash = UInt128Wrapper(vectorized::UInt128(static_cast<uint64_t>(idx + 
1)));
+    key.offset = static_cast<size_t>(idx * 16);
+    key.meta.expiration_time = 0;
+    key.meta.type = FileCacheType::NORMAL;
+    return std::make_shared<FileBlock>(key, /*size*/ 1, /*mgr*/ nullptr, 
FileBlock::State::EMPTY);
+}
+
+void insert_blocks(NeedUpdateLRUBlocks* pending, int count, int start_idx = 0) 
{
+    for (int i = 0; i < count; ++i) {
+        ASSERT_TRUE(pending->insert(create_block(start_idx + i)))
+                << "Block " << (start_idx + i) << " should be inserted";
+    }
+}
+
+} // namespace
+
+TEST(NeedUpdateLRUBlocksTest, InsertRejectsNullAndDeduplicates) {
+    NeedUpdateLRUBlocks pending;
+    FileBlockSPtr null_block;
+    EXPECT_FALSE(pending.insert(null_block));
+    EXPECT_EQ(0, pending.size());
+
+    auto block = create_block(0);
+    EXPECT_TRUE(pending.insert(block));
+    EXPECT_EQ(1, pending.size());
+
+    EXPECT_FALSE(pending.insert(block)) << "Same pointer should not enqueue 
twice";
+    EXPECT_EQ(1, pending.size());
+}
+
+TEST(NeedUpdateLRUBlocksTest, DrainHandlesZeroLimitAndNullOutput) {
+    NeedUpdateLRUBlocks pending;
+    insert_blocks(&pending, 3);
+    std::vector<FileBlockSPtr> drained;
+
+    EXPECT_EQ(0, pending.drain(0, &drained));
+    EXPECT_TRUE(drained.empty());
+    EXPECT_EQ(3, pending.size());
+
+    EXPECT_EQ(0, pending.drain(2, nullptr));
+    EXPECT_EQ(3, pending.size());
+}
+
+TEST(NeedUpdateLRUBlocksTest, DrainRespectsLimitAndLeavesRemainder) {
+    NeedUpdateLRUBlocks pending;
+    insert_blocks(&pending, 5);
+    std::vector<FileBlockSPtr> drained;
+
+    size_t drained_now = pending.drain(2, &drained);
+    EXPECT_EQ(2u, drained_now);
+    EXPECT_EQ(2u, drained.size());
+    EXPECT_EQ(3u, pending.size());
+
+    drained_now = pending.drain(10, &drained);
+    EXPECT_EQ(3u, drained_now);
+    EXPECT_EQ(5u, drained.size());
+    EXPECT_EQ(0u, pending.size());
+}
+
+TEST(NeedUpdateLRUBlocksTest, DrainFromEmptyReturnsZero) {
+    NeedUpdateLRUBlocks pending;
+    std::vector<FileBlockSPtr> drained;
+    EXPECT_EQ(0u, pending.drain(4, &drained));
+    EXPECT_TRUE(drained.empty());
+}
+
+TEST(NeedUpdateLRUBlocksTest, ClearIsIdempotent) {
+    NeedUpdateLRUBlocks pending;
+    pending.clear();
+    EXPECT_EQ(0u, pending.size());
+
+    insert_blocks(&pending, 4);
+    EXPECT_EQ(4u, pending.size());
+
+    pending.clear();
+    EXPECT_EQ(0u, pending.size());
+
+    std::vector<FileBlockSPtr> drained;
+    EXPECT_EQ(0u, pending.drain(4, &drained));
+}
+
+} // namespace doris::io


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to