This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 4c3a586b6ae branch-4.0: [fix](packed-file) Fix packed file cache
cleanup issue (#59892) (#60052)
4c3a586b6ae is described below
commit 4c3a586b6ae451761f9b6d43c62af03bce6d0a8f
Author: Xin Liao <[email protected]>
AuthorDate: Tue Jan 20 17:30:56 2026 +0800
branch-4.0: [fix](packed-file) Fix packed file cache cleanup issue (#59892)
(#60052)
Cherry-picked from https://github.com/apache/doris/pull/59892
---
be/src/io/fs/packed_file_manager.cpp | 121 ++++++++++++++++++++++++-
be/src/io/fs/packed_file_system.cpp | 42 ++++++---
be/test/io/fs/packed_file_concurrency_test.cpp | 15 ++-
3 files changed, 160 insertions(+), 18 deletions(-)
diff --git a/be/src/io/fs/packed_file_manager.cpp
b/be/src/io/fs/packed_file_manager.cpp
index 3cf65f12e49..53a445502c7 100644
--- a/be/src/io/fs/packed_file_manager.cpp
+++ b/be/src/io/fs/packed_file_manager.cpp
@@ -41,10 +41,15 @@
#include "cloud/config.h"
#include "common/config.h"
#include "gen_cpp/cloud.pb.h"
+#include "io/cache/block_file_cache.h"
+#include "io/cache/block_file_cache_factory.h"
+#include "io/cache/file_block.h"
+#include "io/cache/file_cache_common.h"
#include "io/fs/packed_file_trailer.h"
#include "olap/storage_engine.h"
#include "runtime/exec_env.h"
#include "util/coding.h"
+#include "util/slice.h"
#include "util/uid_util.h"
namespace doris::io {
@@ -75,6 +80,12 @@ bvar::Window<bvar::IntRecorder>
g_packed_file_uploading_to_uploaded_ms_window(
"packed_file_uploading_to_uploaded_ms",
&g_packed_file_uploading_to_uploaded_ms_recorder,
/*window_size=*/10);
+// Metrics for async small file cache write
+bvar::Adder<int64_t> g_packed_file_cache_async_write_count("packed_file_cache",
+
"async_write_count");
+bvar::Adder<int64_t> g_packed_file_cache_async_write_bytes("packed_file_cache",
+
"async_write_bytes");
+
Status append_packed_info_trailer(FileWriter* writer, const std::string&
packed_file_path,
const cloud::PackedFileInfoPB&
packed_file_info) {
if (writer == nullptr) {
@@ -108,6 +119,104 @@ Status append_packed_info_trailer(FileWriter* writer,
const std::string& packed_
return writer->append(Slice(trailer));
}
+// write small file data to file cache
+void do_write_to_file_cache(const std::string& small_file_path, const
std::string& data,
+ int64_t tablet_id) {
+ if (data.empty()) {
+ return;
+ }
+
+ // Generate cache key from small file path (e.g., "rowset_id_seg_id.dat")
+ Path path(small_file_path);
+ UInt128Wrapper cache_hash = BlockFileCache::hash(path.filename().native());
+
+ VLOG_DEBUG << "packed_file_cache_write: path=" << small_file_path
+ << " filename=" << path.filename().native() << " hash=" <<
cache_hash.to_string()
+ << " size=" << data.size() << " tablet_id=" << tablet_id;
+
+ BlockFileCache* file_cache =
FileCacheFactory::instance()->get_by_path(cache_hash);
+ if (file_cache == nullptr) {
+ return; // Cache not available, skip
+ }
+
+ // Allocate cache blocks
+ CacheContext ctx;
+ ctx.cache_type = FileCacheType::NORMAL;
+ ReadStatistics stats;
+ ctx.stats = &stats;
+
+ FileBlocksHolder holder = file_cache->get_or_set(cache_hash, 0,
data.size(), ctx);
+
+ // Write data to cache blocks
+ size_t data_offset = 0;
+ for (auto& block : holder.file_blocks) {
+ if (data_offset >= data.size()) {
+ break;
+ }
+ size_t block_size = block->range().size();
+ size_t write_size = std::min(block_size, data.size() - data_offset);
+
+ if (block->state() == FileBlock::State::EMPTY) {
+ block->get_or_set_downloader();
+ if (block->is_downloader()) {
+ Slice s(data.data() + data_offset, write_size);
+ Status st = block->append(s);
+ if (st.ok()) {
+ st = block->finalize();
+ }
+ if (!st.ok()) {
+ LOG(WARNING) << "Write small file to cache failed: " <<
st.msg();
+ }
+ }
+ }
+ data_offset += write_size;
+ }
+}
+
+// Async wrapper: submit cache write task to thread pool
+// The data is copied into the lambda capture to ensure its lifetime extends
beyond
+// the async task execution. The original Slice may reference a buffer that
gets
+// reused or freed before the async task runs.
+void write_small_file_to_cache_async(const std::string& small_file_path, const
Slice& data,
+ int64_t tablet_id) {
+ if (!config::enable_file_cache || data.size == 0) {
+ return;
+ }
+
+ // Copy data since original buffer may be reused before async task executes
+ // For small files (< 1MB), copy overhead is acceptable
+ std::string data_copy(data.data, data.size);
+ size_t data_size = data.size;
+
+ auto* thread_pool = ExecEnv::GetInstance()->s3_file_upload_thread_pool();
+ if (thread_pool == nullptr) {
+ // Fallback to sync write if thread pool not available
+ do_write_to_file_cache(small_file_path, data_copy, tablet_id);
+ return;
+ }
+
+ // Track async write count and bytes
+ g_packed_file_cache_async_write_count << 1;
+ g_packed_file_cache_async_write_bytes << static_cast<int64_t>(data_size);
+
+ Status st = thread_pool->submit_func(
+ [path = small_file_path, data = std::move(data_copy), tablet_id,
data_size]() {
+ do_write_to_file_cache(path, data, tablet_id);
+ // Decrement async write count after completion
+ g_packed_file_cache_async_write_count << -1;
+ g_packed_file_cache_async_write_bytes <<
-static_cast<int64_t>(data_size);
+ });
+
+ if (!st.ok()) {
+ // Revert metrics since task was not submitted
+ g_packed_file_cache_async_write_count << -1;
+ g_packed_file_cache_async_write_bytes <<
-static_cast<int64_t>(data_size);
+ LOG(WARNING) << "Failed to submit cache write task for " <<
small_file_path << ": "
+ << st.msg();
+ // Don't block on failure, cache write is best-effort
+ }
+}
+
} // namespace
PackedFileManager* PackedFileManager::instance() {
@@ -150,8 +259,11 @@ Status PackedFileManager::create_new_packed_file_context(
// Create file writer for the packed file
FileWriterPtr new_writer;
FileWriterOptions opts;
- // enable write file cache for packed file
- opts.write_file_cache = true;
+ // Disable write_file_cache for packed file itself.
+ // We write file cache for each small file separately in
append_small_file()
+ // using the small file path as cache key, ensuring cache entries can be
+ // properly cleaned up when stale rowsets are removed.
+ opts.write_file_cache = false;
RETURN_IF_ERROR(
packed_file_ctx->file_system->create_file(Path(relative_path),
&new_writer, &opts));
packed_file_ctx->writer = std::move(new_writer);
@@ -253,6 +365,11 @@ Status PackedFileManager::append_small_file(const
std::string& path, const Slice
// Write data to current packed file
RETURN_IF_ERROR(active_state->writer->append(data));
+ // Async write data to file cache using small file path as cache key.
+ // This ensures cache key matches the cleanup key in Rowset::clear_cache(),
+ // allowing proper cache cleanup when stale rowsets are removed.
+ write_small_file_to_cache_async(path, data, info.tablet_id);
+
// Update index
PackedSliceLocation location;
location.packed_file_path = active_state->packed_file_path;
diff --git a/be/src/io/fs/packed_file_system.cpp
b/be/src/io/fs/packed_file_system.cpp
index dd5b136ba3b..7ce027a94a2 100644
--- a/be/src/io/fs/packed_file_system.cpp
+++ b/be/src/io/fs/packed_file_system.cpp
@@ -20,6 +20,7 @@
#include <utility>
#include "common/status.h"
+#include "io/fs/file_reader.h"
#include "io/fs/packed_file_reader.h"
#include "io/fs/packed_file_writer.h"
@@ -69,18 +70,37 @@ Status PackedFileSystem::open_file_impl(const Path& file,
FileReaderSPtr* reader
// File is in packed file, open packed file and wrap with
PackedFileReader
const auto& index = it->second;
FileReaderSPtr inner_reader;
- // Create a new FileReaderOptions with the correct file size
- FileReaderOptions local_opts = opts ? *opts : FileReaderOptions();
- // Set file_size to packed file size to avoid head object request
- local_opts.file_size = index.packed_file_size;
- LOG(INFO) << "open packed file: " << index.packed_file_path << ",
file: " << file.native()
- << ", offset: " << index.offset << ", size: " << index.size
- << ", packed_file_size: " << index.packed_file_size;
+
+ // Create options for opening the packed file
+ // Disable cache at this layer - we'll add cache wrapper around
PackedFileReader instead
+ // This ensures cache key is based on segment path, not packed file
path
+ FileReaderOptions inner_opts = opts ? *opts : FileReaderOptions();
+ inner_opts.file_size = index.packed_file_size;
+ inner_opts.cache_type = FileCachePolicy::NO_CACHE;
+
+ VLOG_DEBUG << "open packed file: " << index.packed_file_path << ",
file: " << file.native()
+ << ", offset: " << index.offset << ", size: " << index.size
+ << ", packed_file_size: " << index.packed_file_size;
RETURN_IF_ERROR(
- _inner_fs->open_file(Path(index.packed_file_path),
&inner_reader, &local_opts));
+ _inner_fs->open_file(Path(index.packed_file_path),
&inner_reader, &inner_opts));
+
+ // Create PackedFileReader with segment path
+ // PackedFileReader.path() returns segment path, not packed file path
+ auto packed_reader =
std::make_shared<PackedFileReader>(std::move(inner_reader), file,
+ index.offset,
index.size);
- *reader = std::make_shared<PackedFileReader>(std::move(inner_reader),
file, index.offset,
- index.size);
+ // If cache is requested, wrap PackedFileReader with
CachedRemoteFileReader
+ // This ensures:
+ // 1. Cache key = hash(segment_path.filename()) - matches cleanup key
+ // 2. Cache size = segment size - correct boundary
+ // 3. Each segment has independent cache entry - no interference
during cleanup
+ if (opts && opts->cache_type != FileCachePolicy::NO_CACHE) {
+ FileReaderOptions cache_opts = *opts;
+ cache_opts.file_size = index.size; // Use segment size for cache
+ *reader = DORIS_TRY(create_cached_file_reader(packed_reader,
cache_opts));
+ } else {
+ *reader = packed_reader;
+ }
} else {
RETURN_IF_ERROR(_inner_fs->open_file(file, reader, opts));
}
@@ -88,7 +108,7 @@ Status PackedFileSystem::open_file_impl(const Path& file,
FileReaderSPtr* reader
}
Status PackedFileSystem::exists_impl(const Path& path, bool* res) const {
- LOG(INFO) << "packed file system exist, rowset id " <<
_append_info.rowset_id;
+ VLOG_DEBUG << "packed file system exist, rowset id " <<
_append_info.rowset_id;
if (!_index_map_initialized) {
return Status::InternalError("PackedFileSystem index map is not
initialized");
}
diff --git a/be/test/io/fs/packed_file_concurrency_test.cpp
b/be/test/io/fs/packed_file_concurrency_test.cpp
index 06db472b95e..31c2db19fb8 100644
--- a/be/test/io/fs/packed_file_concurrency_test.cpp
+++ b/be/test/io/fs/packed_file_concurrency_test.cpp
@@ -677,7 +677,10 @@ TEST_F(MergeFileConcurrencyTest,
ConcurrentWriteReadCorrectness) {
std::uniform_int_distribution<int> read_size_dist(4 * 1024, 32 *
1024);
for (int iter = 0; iter < kIterationPerThread; ++iter) {
- std::string path = fmt::format("/tablet_{}/rowset_{}/file_{}",
tid, iter, iter);
+ // Use unique file names to avoid cache key conflicts between
threads
+ // since CachedRemoteFileReader uses path().filename() for
cache hash
+ std::string path =
+ fmt::format("/tablet_{}/rowset_{}/file_t{}_i{}", tid,
iter, tid, iter);
PackedAppendContext append_info;
append_info.resource_id = resource_ids[tid];
@@ -718,11 +721,13 @@ TEST_F(MergeFileConcurrencyTest,
ConcurrentWriteReadCorrectness) {
opts.cache_type = FileCachePolicy::FILE_BLOCK_CACHE;
opts.is_doris_table = true;
ASSERT_TRUE(reader_fs.open_file(Path(path), &reader,
&opts).ok());
- auto* merge_reader =
dynamic_cast<PackedFileReader*>(reader.get());
- ASSERT_NE(merge_reader, nullptr);
- auto* cached_reader =
-
dynamic_cast<CachedRemoteFileReader*>(merge_reader->_inner_reader.get());
+ // After the fix, CachedRemoteFileReader wraps
PackedFileReader (not vice versa)
+ // This ensures cache key uses segment path for proper cleanup
+ auto* cached_reader =
dynamic_cast<CachedRemoteFileReader*>(reader.get());
ASSERT_NE(cached_reader, nullptr);
+ auto* merge_reader =
+
dynamic_cast<PackedFileReader*>(cached_reader->get_remote_reader());
+ ASSERT_NE(merge_reader, nullptr);
IOContext io_ctx;
size_t verified = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]