This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 58938de7bcf [fix](filecache) pass tablet_id through FileReaderOptions
instead of parsing from path (#61683)
58938de7bcf is described below
commit 58938de7bcf4246650c7b751b00f54af7abc5d9a
Author: deardeng <[email protected]>
AuthorDate: Sat Mar 28 18:22:54 2026 +0800
[fix](filecache) pass tablet_id through FileReaderOptions instead of
parsing from path (#61683)
CachedRemoteFileReader::_execute_remote_read previously parsed tablet_id
from file paths at runtime via extract_tablet_id(). This breaks when
enable_packed_file (small file merging) is enabled because packed file
paths don't follow the expected data/{tablet_id}/... format.
Fix: store tablet_id from FileReaderOptions at construction time and use
it directly, eliminating runtime path parsing. Propagate tablet_id
through all code paths: Segment, InvertedIndexFileReader,
FSIndexInput::open, DownloadFileMeta (warmup/preheating), and
beta_rowset consistency checks.
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [x] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [x] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [x] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [x] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/cloud/cloud_internal_service.cpp | 24 +++++----
be/src/cloud/cloud_tablet.cpp | 2 +
be/src/cloud/cloud_warm_up_manager.cpp | 16 ++++--
be/src/cloud/cloud_warm_up_manager.h | 3 +-
be/src/io/cache/block_file_cache_downloader.cpp | 2 +
be/src/io/cache/block_file_cache_downloader.h | 1 +
be/src/io/cache/cached_remote_file_reader.cpp | 57 +++++++++-------------
be/src/io/cache/cached_remote_file_reader.h | 3 +-
be/src/io/tools/file_cache_microbench.cpp | 1 +
.../storage/compaction/collection_statistics.cpp | 2 +-
be/src/storage/compaction/compaction.cpp | 4 +-
be/src/storage/index/index_file_reader.cpp | 10 ++--
be/src/storage/index/index_file_reader.h | 7 ++-
be/src/storage/index/index_file_writer.cpp | 10 ++--
be/src/storage/index/index_file_writer.h | 4 +-
.../index/inverted/inverted_index_fs_directory.cpp | 4 +-
.../index/inverted/inverted_index_fs_directory.h | 3 +-
be/src/storage/rowset/beta_rowset.cpp | 4 +-
be/src/storage/rowset/beta_rowset_writer.cpp | 3 +-
be/src/storage/rowset/rowset_writer.h | 2 +-
be/src/storage/segment/segment.cpp | 11 ++++-
be/src/storage/segment/segment.h | 1 +
be/src/storage/task/index_builder.cpp | 10 ++--
be/test/io/cache/block_file_cache_test.cpp | 13 +++++
.../io/cache/block_file_cache_test_lru_dump.cpp | 1 +
be/test/io/fs/packed_file_concurrency_test.cpp | 2 +
26 files changed, 125 insertions(+), 75 deletions(-)
diff --git a/be/src/cloud/cloud_internal_service.cpp
b/be/src/cloud/cloud_internal_service.cpp
index 99b08a9bf38..5f336573894 100644
--- a/be/src/cloud/cloud_internal_service.cpp
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -579,11 +579,13 @@ void
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
.expiration_time = expiration_time,
.is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
.is_warmup = true},
- .download_done = [=, version =
rs_meta.version()](Status st) {
- handle_segment_download_done(st, tablet_id,
rowset_id, segment_id,
- tablet, wait,
version, segment_size,
- request_ts,
handle_ts);
- }};
+ .download_done =
+ [=, version = rs_meta.version()](Status st) {
+ handle_segment_download_done(
+ st, tablet_id, rowset_id,
segment_id, tablet, wait,
+ version, segment_size, request_ts,
handle_ts);
+ },
+ .tablet_id = tablet_id};
g_file_cache_event_driven_warm_up_submitted_segment_num << 1;
g_file_cache_event_driven_warm_up_submitted_segment_size <<
segment_size;
@@ -604,11 +606,13 @@ void
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
.expiration_time = expiration_time,
.is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
.is_warmup = true},
- .download_done = [=, version =
rs_meta.version()](Status st) {
- handle_inverted_index_download_done(
- st, tablet_id, rowset_id, segment_id,
index_path, tablet, wait,
- version, idx_size, request_ts, handle_ts);
- }};
+ .download_done =
+ [=, version = rs_meta.version()](Status st) {
+ handle_inverted_index_download_done(
+ st, tablet_id, rowset_id,
segment_id, index_path,
+ tablet, wait, version, idx_size,
request_ts, handle_ts);
+ },
+ .tablet_id = tablet_id};
g_file_cache_event_driven_warm_up_submitted_index_num << 1;
g_file_cache_event_driven_warm_up_submitted_index_size <<
idx_size;
tablet->update_rowset_warmup_state_inverted_idx_num(
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 2567019301c..f7be421e442 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -1718,6 +1718,7 @@ void CloudTablet::_submit_segment_download_task(const
RowsetSharedPtr& rs,
LOG_WARNING("add rowset warm up error ").error(st);
}
}},
+ .tablet_id = _tablet_meta->tablet_id(),
});
// clang-format on
}
@@ -1760,6 +1761,7 @@ void
CloudTablet::_submit_inverted_index_download_task(const RowsetSharedPtr& rs
LOG_WARNING("add rowset warm up error ").error(st);
}
}},
+ .tablet_id = _tablet_meta->tablet_id(),
};
self->update_rowset_warmup_state_inverted_idx_num_unlocked(WarmUpTriggerSource::SYNC_ROWSET,
rowset_meta->rowset_id(), 1);
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
diff --git a/be/src/cloud/cloud_warm_up_manager.cpp
b/be/src/cloud/cloud_warm_up_manager.cpp
index 9df39e8c305..6383850d487 100644
--- a/be/src/cloud/cloud_warm_up_manager.cpp
+++ b/be/src/cloud/cloud_warm_up_manager.cpp
@@ -129,7 +129,8 @@ void CloudWarmUpManager::submit_download_tasks(io::Path
path, int64_t file_size,
io::FileSystemSPtr file_system,
int64_t expiration_time,
std::shared_ptr<bthread::CountdownEvent> wait,
- bool is_index,
std::function<void(Status)> done_cb) {
+ bool is_index,
std::function<void(Status)> done_cb,
+ int64_t tablet_id) {
VLOG_DEBUG << "submit warm up task for file: " << path << ", file_size: "
<< file_size
<< ", expiration_time: " << expiration_time
<< ", is_index: " << (is_index ? "true" : "false");
@@ -184,6 +185,7 @@ void CloudWarmUpManager::submit_download_tasks(io::Path
path, int64_t file_size,
}
wait->signal();
},
+ .tablet_id = tablet_id,
});
offset += current_chunk_size;
@@ -256,7 +258,8 @@ void CloudWarmUpManager::handle_jobs() {
submit_download_tasks(
storage_resource.value()->remote_segment_path(*rs, seg_id),
rs->segment_file_size(cast_set<int>(seg_id)),
rs->fs(),
- expiration_time, wait, false, [tablet, rs,
seg_id](Status st) {
+ expiration_time, wait, false,
+ [tablet, rs, seg_id](Status st) {
VLOG_DEBUG << "warmup rowset " <<
rs->version() << " segment "
<< seg_id << " completed";
if (tablet->complete_rowset_segment_warmup(
@@ -266,7 +269,8 @@ void CloudWarmUpManager::handle_jobs() {
VLOG_DEBUG << "warmup rowset " <<
rs->version()
<< " completed";
}
- });
+ },
+ tablet_id);
}
// 2nd. download inverted index files
@@ -313,7 +317,8 @@ void CloudWarmUpManager::handle_jobs() {
VLOG_DEBUG << "warmup rowset " <<
rs->version()
<< " completed";
}
- });
+ },
+ tablet_id);
}
} else {
if (schema_ptr->has_inverted_index() ||
schema_ptr->has_ann_index()) {
@@ -336,7 +341,8 @@ void CloudWarmUpManager::handle_jobs() {
VLOG_DEBUG << "warmup rowset " <<
rs->version()
<< " completed";
}
- });
+ },
+ tablet_id);
}
}
}
diff --git a/be/src/cloud/cloud_warm_up_manager.h
b/be/src/cloud/cloud_warm_up_manager.h
index 47411f657b8..992702f162e 100644
--- a/be/src/cloud/cloud_warm_up_manager.h
+++ b/be/src/cloud/cloud_warm_up_manager.h
@@ -114,7 +114,8 @@ private:
void submit_download_tasks(io::Path path, int64_t file_size,
io::FileSystemSPtr file_system,
int64_t expiration_time,
std::shared_ptr<bthread::CountdownEvent> wait,
bool is_index = false,
- std::function<void(Status)> done_cb = nullptr);
+ std::function<void(Status)> done_cb = nullptr,
+ int64_t tablet_id = -1);
std::mutex _mtx;
std::condition_variable _cond;
int64_t _cur_job_id {0};
diff --git a/be/src/io/cache/block_file_cache_downloader.cpp
b/be/src/io/cache/block_file_cache_downloader.cpp
index ae5fff244da..1c4ca8577f6 100644
--- a/be/src/io/cache/block_file_cache_downloader.cpp
+++ b/be/src/io/cache/block_file_cache_downloader.cpp
@@ -286,6 +286,7 @@ void FileCacheBlockDownloader::download_file_cache_block(
.is_warmup = true,
},
.download_done = std::move(download_done),
+ .tablet_id = meta.tablet_id(),
};
download_segment_file(download_meta);
});
@@ -300,6 +301,7 @@ void FileCacheBlockDownloader::download_segment_file(const
DownloadFileMeta& met
.is_doris_table = true,
.cache_base_path {},
.file_size = meta.file_size,
+ .tablet_id = meta.tablet_id,
};
auto st = meta.file_system->open_file(meta.path, &file_reader, &opts);
if (!st.ok()) {
diff --git a/be/src/io/cache/block_file_cache_downloader.h
b/be/src/io/cache/block_file_cache_downloader.h
index c9a46891673..29d55a4c6cc 100644
--- a/be/src/io/cache/block_file_cache_downloader.h
+++ b/be/src/io/cache/block_file_cache_downloader.h
@@ -44,6 +44,7 @@ struct DownloadFileMeta {
io::FileSystemSPtr file_system;
IOContext ctx;
std::function<void(Status)> download_done;
+ int64_t tablet_id {-1};
};
struct DownloadTask {
diff --git a/be/src/io/cache/cached_remote_file_reader.cpp
b/be/src/io/cache/cached_remote_file_reader.cpp
index 083d4a5e3f5..ee5f4ceadbd 100644
--- a/be/src/io/cache/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/cached_remote_file_reader.cpp
@@ -55,7 +55,6 @@
#include "runtime/thread_context.h"
#include "runtime/workload_management/io_throttle.h"
#include "service/backend_options.h"
-#include "storage/storage_policy.h"
#include "util/bit_util.h"
#include "util/brpc_client_cache.h" // BrpcClientCache
#include "util/client_cache.h"
@@ -93,7 +92,8 @@ bvar::Adder<uint64_t> g_failed_get_peer_addr_counter(
CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr
remote_file_reader,
const FileReaderOptions& opts)
- : _remote_file_reader(std::move(remote_file_reader)) {
+ : _tablet_id(opts.tablet_id),
_remote_file_reader(std::move(remote_file_reader)) {
+ DCHECK(!opts.is_doris_table || _tablet_id > 0);
_is_doris_table = opts.is_doris_table;
if (_is_doris_table) {
_cache_hash = BlockFileCache::hash(path().filename().native());
@@ -159,28 +159,30 @@ std::pair<size_t, size_t>
CachedRemoteFileReader::s_align_size(size_t offset, si
}
namespace {
-std::optional<int64_t> extract_tablet_id(const std::string& file_path) {
- return StorageResource::parse_tablet_id_from_path(file_path);
+// Execute S3 read
+Status execute_s3_read(size_t empty_start, size_t& size,
std::unique_ptr<char[]>& buffer,
+ ReadStatistics& stats, const IOContext* io_ctx,
+ FileReaderSPtr remote_file_reader) {
+ s3_read_counter << 1;
+ SCOPED_RAW_TIMER(&stats.remote_read_timer);
+ stats.from_peer_cache = false;
+ return remote_file_reader->read_at(empty_start, Slice(buffer.get(), size),
&size, io_ctx);
}
// Get peer connection info from tablet_id
-std::pair<std::string, int> get_peer_connection_info(const std::string&
file_path) {
+std::pair<std::string, int> get_peer_connection_info(int64_t tablet_id,
+ const std::string&
file_path) {
std::string host = "";
int port = 0;
- // Try to get tablet_id from actual path and lookup tablet info
- if (auto tablet_id = extract_tablet_id(file_path)) {
- auto& manager =
ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
- if (auto tablet_info = manager.get_balanced_tablet_info(*tablet_id)) {
- host = tablet_info->first;
- port = tablet_info->second;
- } else {
- VLOG_DEBUG << "get peer connection info not found"
- << ", tablet_id=" << *tablet_id << ", file_path=" <<
file_path;
- }
+ DCHECK(tablet_id > 0);
+ auto& manager =
ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
+ if (auto tablet_info = manager.get_balanced_tablet_info(tablet_id)) {
+ host = tablet_info->first;
+ port = tablet_info->second;
} else {
- VLOG_DEBUG << "parse tablet id from path failed"
- << "tablet_id=null, file_path=" << file_path;
+ LOG_EVERY_N(WARNING, 100) << "get peer connection info not found"
+ << ", tablet_id=" << tablet_id << ",
file_path=" << file_path;
}
DBUG_EXECUTE_IF("PeerFileCacheReader::_fetch_from_peer_cache_blocks", {
@@ -200,8 +202,8 @@ std::pair<std::string, int> get_peer_connection_info(const
std::string& file_pat
Status execute_peer_read(const std::vector<FileBlockSPtr>& empty_blocks,
size_t empty_start,
size_t& size, std::unique_ptr<char[]>& buffer,
const std::string& file_path, size_t file_size, bool
is_doris_table,
- ReadStatistics& stats, const IOContext* io_ctx) {
- auto [host, port] = get_peer_connection_info(file_path);
+ int64_t tablet_id, ReadStatistics& stats, const
IOContext* io_ctx) {
+ auto [host, port] = get_peer_connection_info(tablet_id, file_path);
VLOG_DEBUG << "PeerFileCacheReader read from peer, host=" << host << ",
port=" << port
<< ", file_path=" << file_path;
@@ -224,16 +226,6 @@ Status execute_peer_read(const std::vector<FileBlockSPtr>&
empty_blocks, size_t
return st;
}
-// Execute S3 read
-Status execute_s3_read(size_t empty_start, size_t& size,
std::unique_ptr<char[]>& buffer,
- ReadStatistics& stats, const IOContext* io_ctx,
- FileReaderSPtr remote_file_reader) {
- s3_read_counter << 1;
- SCOPED_RAW_TIMER(&stats.remote_read_timer);
- stats.from_peer_cache = false;
- return remote_file_reader->read_at(empty_start, Slice(buffer.get(), size),
&size, io_ctx);
-}
-
} // anonymous namespace
Status CachedRemoteFileReader::_execute_remote_read(const
std::vector<FileBlockSPtr>& empty_blocks,
@@ -255,7 +247,7 @@ Status CachedRemoteFileReader::_execute_remote_read(const
std::vector<FileBlockS
return execute_s3_read(empty_start, size, buffer, stats, io_ctx,
_remote_file_reader);
} else {
return execute_peer_read(empty_blocks, empty_start, size, buffer,
path().native(),
- this->size(), _is_doris_table, stats,
io_ctx);
+ this->size(), _is_doris_table,
_tablet_id, stats, io_ctx);
}
});
@@ -269,7 +261,7 @@ Status CachedRemoteFileReader::_execute_remote_read(const
std::vector<FileBlockS
// ATTN: Save original size before peer read, as it may be modified by
fetch_blocks, read peer ref size
size_t original_size = size;
auto st = execute_peer_read(empty_blocks, empty_start, size, buffer,
path().native(),
- this->size(), _is_doris_table, stats,
io_ctx);
+ this->size(), _is_doris_table, _tablet_id,
stats, io_ctx);
if (!st.ok()) {
// Restore original size for S3 fallback, as peer read may have
modified it
size = original_size;
@@ -386,8 +378,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
s_align_size(offset + already_read, bytes_req - already_read,
size());
CacheContext cache_context(io_ctx);
cache_context.stats = &stats;
- auto tablet_id = get_tablet_id(path().string());
- cache_context.tablet_id = tablet_id.value_or(0);
+ cache_context.tablet_id = _tablet_id;
MonotonicStopWatch sw;
sw.start();
diff --git a/be/src/io/cache/cached_remote_file_reader.h
b/be/src/io/cache/cached_remote_file_reader.h
index a0037d42c64..3f2e1ceb2e1 100644
--- a/be/src/io/cache/cached_remote_file_reader.h
+++ b/be/src/io/cache/cached_remote_file_reader.h
@@ -85,7 +85,8 @@ private:
void _update_stats(const ReadStatistics& stats, FileCacheStatistics* state,
bool is_inverted_index) const;
- bool _is_doris_table;
+ bool _is_doris_table = false;
+ int64_t _tablet_id = -1;
FileReaderSPtr _remote_file_reader;
UInt128Wrapper _cache_hash;
BlockFileCache* _cache;
diff --git a/be/src/io/tools/file_cache_microbench.cpp
b/be/src/io/tools/file_cache_microbench.cpp
index 50d0eb70f7b..3725083b835 100644
--- a/be/src/io/tools/file_cache_microbench.cpp
+++ b/be/src/io/tools/file_cache_microbench.cpp
@@ -1496,6 +1496,7 @@ private:
doris::io::FileReaderOptions reader_opts;
reader_opts.cache_type =
doris::io::FileCachePolicy::FILE_BLOCK_CACHE;
reader_opts.is_doris_table = true;
+ reader_opts.tablet_id = 1; // microbench placeholder
doris::io::FileDescription fd;
std::string obj_path = "s3://" +
doris::config::test_s3_bucket + "/";
diff --git a/be/src/storage/compaction/collection_statistics.cpp
b/be/src/storage/compaction/collection_statistics.cpp
index 16decc2b15f..afd79f0c0f3 100644
--- a/be/src/storage/compaction/collection_statistics.cpp
+++ b/be/src/storage/compaction/collection_statistics.cpp
@@ -165,7 +165,7 @@ Status CollectionStatistics::process_segment(const
RowsetSharedPtr& rowset, int3
rowset_meta->fs(),
std::string
{InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)},
tablet_schema->get_inverted_index_storage_format(),
- rowset_meta->inverted_index_file_info(seg_id));
+ rowset_meta->inverted_index_file_info(seg_id),
rowset_meta->tablet_id());
RETURN_IF_ERROR(idx_file_reader->init(config::inverted_index_read_buffer_size,
io_ctx));
int32_t total_seg_num_docs = 0;
diff --git a/be/src/storage/compaction/compaction.cpp
b/be/src/storage/compaction/compaction.cpp
index 698c81f7849..322530cbb66 100644
--- a/be/src/storage/compaction/compaction.cpp
+++ b/be/src/storage/compaction/compaction.cpp
@@ -819,7 +819,7 @@ Status Compaction::do_inverted_index_compaction() {
fs,
std::string
{InvertedIndexDescriptor::get_index_file_path_prefix(seg_path.value())},
_cur_tablet_schema->get_inverted_index_storage_format(),
- rowset->rowset_meta()->inverted_index_file_info(seg_id));
+ rowset->rowset_meta()->inverted_index_file_info(seg_id),
_tablet->tablet_id());
auto st =
index_file_reader->init(config::inverted_index_read_buffer_size);
DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_init_inverted_index_file_reader",
{
@@ -1020,7 +1020,7 @@ static bool check_rowset_has_inverted_index(const
RowsetSharedPtr& src_rs, int32
std::string
{InvertedIndexDescriptor::get_index_file_path_prefix(
seg_path.value())},
cur_tablet_schema->get_inverted_index_storage_format(),
- rowset->rowset_meta()->inverted_index_file_info(i));
+ rowset->rowset_meta()->inverted_index_file_info(i),
tablet->tablet_id());
auto st =
index_file_reader->init(config::inverted_index_read_buffer_size);
index_file_path =
index_file_reader->get_index_file_path(index_meta);
DBUG_EXECUTE_IF(
diff --git a/be/src/storage/index/index_file_reader.cpp
b/be/src/storage/index/index_file_reader.cpp
index e4ed32fdc30..348e1399421 100644
--- a/be/src/storage/index/index_file_reader.cpp
+++ b/be/src/storage/index/index_file_reader.cpp
@@ -63,8 +63,9 @@ Status IndexFileReader::_init_from(int32_t read_buffer_size,
const io::IOContext
DCHECK(_fs != nullptr) << "file system is nullptr,
index_file_full_path: "
<< index_file_full_path;
// 2. open file
- auto ok = DorisFSDirectory::FSIndexInput::open(
- _fs, index_file_full_path.c_str(), index_input, err,
read_buffer_size, file_size);
+ auto ok =
+ DorisFSDirectory::FSIndexInput::open(_fs,
index_file_full_path.c_str(), index_input,
+ err, read_buffer_size,
file_size, _tablet_id);
if (!ok) {
if (err.number() == CL_ERR_FileNotFound) {
return Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>(
@@ -182,8 +183,9 @@ Result<std::unique_ptr<DorisCompoundReader,
DirectoryDeleter>> IndexFileReader::
DCHECK(_fs != nullptr)
<< "file system is nullptr, index_file_path: " <<
index_file_path;
// 2. open file
- auto ok = DorisFSDirectory::FSIndexInput::open(
- _fs, index_file_path.c_str(), index_input, err,
_read_buffer_size, file_size);
+ auto ok = DorisFSDirectory::FSIndexInput::open(_fs,
index_file_path.c_str(),
+ index_input, err,
_read_buffer_size,
+ file_size,
_tablet_id);
if (!ok) {
// now index_input = nullptr
if (err.number() == CL_ERR_FileNotFound) {
diff --git a/be/src/storage/index/index_file_reader.h
b/be/src/storage/index/index_file_reader.h
index 8a185f6b8b7..72617446580 100644
--- a/be/src/storage/index/index_file_reader.h
+++ b/be/src/storage/index/index_file_reader.h
@@ -52,11 +52,13 @@ public:
IndexFileReader(io::FileSystemSPtr fs, std::string index_path_prefix,
InvertedIndexStorageFormatPB storage_format,
- InvertedIndexFileInfo idx_file_info =
InvertedIndexFileInfo())
+ InvertedIndexFileInfo idx_file_info =
InvertedIndexFileInfo(),
+ int64_t tablet_id = -1)
: _fs(std::move(fs)),
_index_path_prefix(std::move(index_path_prefix)),
_storage_format(storage_format),
- _idx_file_info(idx_file_info) {}
+ _idx_file_info(idx_file_info),
+ _tablet_id(tablet_id) {}
virtual ~IndexFileReader() = default;
MOCK_FUNCTION Status init(int32_t read_buffer_size =
config::inverted_index_read_buffer_size,
@@ -90,6 +92,7 @@ private:
mutable std::shared_mutex _mutex; // Use mutable for const read operations
bool _inited = false;
InvertedIndexFileInfo _idx_file_info;
+ int64_t _tablet_id = -1;
};
} // namespace segment_v2
diff --git a/be/src/storage/index/index_file_writer.cpp
b/be/src/storage/index/index_file_writer.cpp
index 9acfadf4172..2d9c4014418 100644
--- a/be/src/storage/index/index_file_writer.cpp
+++ b/be/src/storage/index/index_file_writer.cpp
@@ -41,7 +41,8 @@ namespace doris::segment_v2 {
IndexFileWriter::IndexFileWriter(io::FileSystemSPtr fs, std::string
index_path_prefix,
std::string rowset_id, int64_t seg_id,
InvertedIndexStorageFormatPB storage_format,
- io::FileWriterPtr file_writer, bool
can_use_ram_dir)
+ io::FileWriterPtr file_writer, bool
can_use_ram_dir,
+ int64_t tablet_id)
: _fs(std::move(fs)),
_index_path_prefix(std::move(index_path_prefix)),
_rowset_id(std::move(rowset_id)),
@@ -49,7 +50,8 @@ IndexFileWriter::IndexFileWriter(io::FileSystemSPtr fs,
std::string index_path_p
_storage_format(storage_format),
_local_fs(io::global_local_filesystem()),
_idx_v2_writer(std::move(file_writer)),
- _can_use_ram_dir(can_use_ram_dir) {
+ _can_use_ram_dir(can_use_ram_dir),
+ _tablet_id(tablet_id) {
auto tmp_file_dir =
ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir();
_tmp_dir = tmp_file_dir.native();
if (_storage_format == InvertedIndexStorageFormatPB::V1) {
@@ -121,8 +123,8 @@ Status IndexFileWriter::delete_index(const TabletIndex*
index_meta) {
}
Status IndexFileWriter::add_into_searcher_cache() {
- auto index_file_reader =
- std::make_unique<IndexFileReader>(_fs, _index_path_prefix,
_storage_format);
+ auto index_file_reader = std::make_unique<IndexFileReader>(
+ _fs, _index_path_prefix, _storage_format, InvertedIndexFileInfo(),
_tablet_id);
auto st = index_file_reader->init();
if (!st.ok()) {
if (dynamic_cast<io::StreamSinkFileWriter*>(_idx_v2_writer.get()) !=
nullptr) {
diff --git a/be/src/storage/index/index_file_writer.h
b/be/src/storage/index/index_file_writer.h
index d968dc5fa12..a303de8b68c 100644
--- a/be/src/storage/index/index_file_writer.h
+++ b/be/src/storage/index/index_file_writer.h
@@ -50,7 +50,8 @@ class IndexFileWriter {
public:
IndexFileWriter(io::FileSystemSPtr fs, std::string index_path_prefix,
std::string rowset_id,
int64_t seg_id, InvertedIndexStorageFormatPB
storage_format,
- io::FileWriterPtr file_writer = nullptr, bool
can_use_ram_dir = true);
+ io::FileWriterPtr file_writer = nullptr, bool
can_use_ram_dir = true,
+ int64_t tablet_id = -1);
virtual ~IndexFileWriter() = default;
MOCK_FUNCTION Result<std::shared_ptr<DorisFSDirectory>> open(const
TabletIndex* index_meta);
@@ -111,6 +112,7 @@ private:
bool _can_use_ram_dir = true;
IndexStorageFormatPtr _index_storage_format;
+ int64_t _tablet_id = -1;
friend class IndexStorageFormatV1;
friend class IndexStorageFormatV2;
diff --git a/be/src/storage/index/inverted/inverted_index_fs_directory.cpp
b/be/src/storage/index/inverted/inverted_index_fs_directory.cpp
index 6b2c4a80b05..e65025b25a4 100644
--- a/be/src/storage/index/inverted/inverted_index_fs_directory.cpp
+++ b/be/src/storage/index/inverted/inverted_index_fs_directory.cpp
@@ -87,7 +87,8 @@ const char* const DorisFSDirectory::WRITE_LOCK_FILE =
"write.lock";
bool DorisFSDirectory::FSIndexInput::open(const io::FileSystemSPtr& fs, const
char* path,
IndexInput*& ret, CLuceneError&
error,
- int32_t buffer_size, int64_t
file_size) {
+ int32_t buffer_size, int64_t
file_size,
+ int64_t tablet_id) {
CND_PRECONDITION(path != nullptr, "path is NULL");
if (buffer_size == -1) {
@@ -100,6 +101,7 @@ bool DorisFSDirectory::FSIndexInput::open(const
io::FileSystemSPtr& fs, const ch
:
io::FileCachePolicy::NO_CACHE;
reader_options.is_doris_table = true;
reader_options.file_size = file_size;
+ reader_options.tablet_id = tablet_id;
Status st = fs->open_file(path, &h->_reader, &reader_options);
DBUG_EXECUTE_IF("inverted file read error: index file not found",
{ st = Status::Error<doris::ErrorCode::NOT_FOUND>("index
file not found"); })
diff --git a/be/src/storage/index/inverted/inverted_index_fs_directory.h
b/be/src/storage/index/inverted/inverted_index_fs_directory.h
index 7b1ac0bdf55..79854df88d2 100644
--- a/be/src/storage/index/inverted/inverted_index_fs_directory.h
+++ b/be/src/storage/index/inverted/inverted_index_fs_directory.h
@@ -190,7 +190,8 @@ protected:
public:
static bool open(const io::FileSystemSPtr& fs, const char* path,
IndexInput*& ret,
- CLuceneError& error, int32_t bufferSize = -1, int64_t
file_size = -1);
+ CLuceneError& error, int32_t bufferSize = -1, int64_t
file_size = -1,
+ int64_t tablet_id = -1);
~FSIndexInput() override;
IndexInput* clone() const override;
diff --git a/be/src/storage/rowset/beta_rowset.cpp
b/be/src/storage/rowset/beta_rowset.cpp
index e94430731b1..96fb7a36889 100644
--- a/be/src/storage/rowset/beta_rowset.cpp
+++ b/be/src/storage/rowset/beta_rowset.cpp
@@ -633,6 +633,7 @@ Status BetaRowset::check_current_rowset_segment() {
.is_doris_table = true,
.cache_base_path {},
.file_size = _rowset_meta->segment_file_size(seg_id),
+ .tablet_id = _rowset_meta->tablet_id(),
};
auto s = segment_v2::Segment::open(fs, seg_path,
_rowset_meta->tablet_id(), seg_id,
@@ -843,7 +844,8 @@ Status BetaRowset::show_nested_index_file(rapidjson::Value*
rowset_value,
auto seg_path = DORIS_TRY(segment_path(seg_id));
auto index_file_path_prefix =
InvertedIndexDescriptor::get_index_file_path_prefix(seg_path);
auto index_file_reader = std::make_unique<IndexFileReader>(
- fs, std::string(index_file_path_prefix), storage_format);
+ fs, std::string(index_file_path_prefix), storage_format,
InvertedIndexFileInfo(),
+ _rowset_meta->tablet_id());
RETURN_IF_ERROR(index_file_reader->init());
auto dirs = index_file_reader->get_all_directories();
diff --git a/be/src/storage/rowset/beta_rowset_writer.cpp
b/be/src/storage/rowset/beta_rowset_writer.cpp
index 25216e3de9c..94bd38a16ef 100644
--- a/be/src/storage/rowset/beta_rowset_writer.cpp
+++ b/be/src/storage/rowset/beta_rowset_writer.cpp
@@ -442,6 +442,7 @@ Status
BetaRowsetWriter::_load_noncompacted_segment(segment_v2::SegmentSharedPtr
: io::FileCachePolicy::NO_CACHE,
.is_doris_table = true,
.cache_base_path {},
+ .tablet_id = _rowset_meta->tablet_id(),
};
auto s = segment_v2::Segment::open(fs, path, _rowset_meta->tablet_id(),
segment_id, rowset_id(),
_context.tablet_schema, reader_options,
&segment);
@@ -1107,7 +1108,7 @@ Status
BetaRowsetWriter::create_segment_writer_for_segcompaction(
index_file_writer = std::make_unique<IndexFileWriter>(
_context.fs(), prefix, _context.rowset_id.to_string(),
_num_segcompacted,
_context.tablet_schema->get_inverted_index_storage_format(),
- std::move(idx_file_writer));
+ std::move(idx_file_writer), true /* can_use_ram_dir */,
_context.tablet_id);
}
segment_v2::SegmentWriterOptions writer_options;
diff --git a/be/src/storage/rowset/rowset_writer.h
b/be/src/storage/rowset/rowset_writer.h
index 693ebfe4169..75c0bff084b 100644
--- a/be/src/storage/rowset/rowset_writer.h
+++ b/be/src/storage/rowset/rowset_writer.h
@@ -114,7 +114,7 @@ public:
*index_file_writer = std::make_unique<IndexFileWriter>(
_context.fs(), segment_prefix, _context.rowset_id.to_string(),
segment_id,
_context.tablet_schema->get_inverted_index_storage_format(),
- std::move(idx_file_v2_ptr), can_use_ram_dir);
+ std::move(idx_file_v2_ptr), can_use_ram_dir,
_context.tablet_id);
return Status::OK();
}
diff --git a/be/src/storage/segment/segment.cpp
b/be/src/storage/segment/segment.cpp
index fdeeb3e752c..8f768ae4f89 100644
--- a/be/src/storage/segment/segment.cpp
+++ b/be/src/storage/segment/segment.cpp
@@ -87,8 +87,15 @@ Status Segment::open(io::FileSystemSPtr fs, const
std::string& path, int64_t tab
uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr
tablet_schema,
const io::FileReaderOptions& reader_options,
std::shared_ptr<Segment>* output,
InvertedIndexFileInfo idx_file_info,
OlapReaderStatistics* stats) {
- auto s = _open(fs, path, segment_id, rowset_id, tablet_schema,
reader_options, output,
+ // Ensure tablet_id is available in reader_options for
CachedRemoteFileReader peer read.
+ io::FileReaderOptions opts_with_tablet = reader_options;
+ opts_with_tablet.tablet_id = tablet_id;
+
+ auto s = _open(fs, path, segment_id, rowset_id, tablet_schema,
opts_with_tablet, output,
idx_file_info, stats);
+ if (s.ok() && output && *output) {
+ (*output)->_tablet_id = tablet_id;
+ }
if (!s.ok()) {
if (!config::is_cloud_mode()) {
auto res = ExecEnv::get_tablet(tablet_id);
@@ -235,7 +242,7 @@ Status Segment::_open_index_file_reader() {
_fs,
std::string {InvertedIndexDescriptor::get_index_file_path_prefix(
_file_reader->path().native())},
- _tablet_schema->get_inverted_index_storage_format(),
_idx_file_info);
+ _tablet_schema->get_inverted_index_storage_format(),
_idx_file_info, _tablet_id);
return Status::OK();
}
diff --git a/be/src/storage/segment/segment.h b/be/src/storage/segment/segment.h
index 4e434680140..f6e8dc6db52 100644
--- a/be/src/storage/segment/segment.h
+++ b/be/src/storage/segment/segment.h
@@ -300,6 +300,7 @@ private:
DorisCallOnce<Status> _index_file_reader_open;
InvertedIndexFileInfo _idx_file_info;
+ int64_t _tablet_id = -1;
int _be_exec_version = BeExecVersionManager::get_newest_version();
};
diff --git a/be/src/storage/task/index_builder.cpp
b/be/src/storage/task/index_builder.cpp
index b7e2db3b4b6..93c70bf0cdb 100644
--- a/be/src/storage/task/index_builder.cpp
+++ b/be/src/storage/task/index_builder.cpp
@@ -279,7 +279,8 @@ Status IndexBuilder::update_inverted_index_info() {
auto idx_file_reader = std::make_unique<IndexFileReader>(
context.fs(),
std::string
{InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)},
-
output_rs_tablet_schema->get_inverted_index_storage_format());
+
output_rs_tablet_schema->get_inverted_index_storage_format(),
+ InvertedIndexFileInfo(), _tablet->tablet_id());
auto st = idx_file_reader->init();
DBUG_EXECUTE_IF(
"IndexBuilder::update_inverted_index_info_index_file_reader_init_not_ok", {
@@ -372,7 +373,7 @@ Status
IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta
fs, std::move(index_path_prefix),
output_rowset_meta->rowset_id().to_string(),
seg_ptr->id(),
output_rowset_schema->get_inverted_index_storage_format(),
- std::move(file_writer));
+ std::move(file_writer), true /* can_use_ram_dir */,
_tablet->tablet_id());
RETURN_IF_ERROR(index_file_writer->initialize(dirs));
// create inverted index writer
for (auto& index_meta : _dropped_inverted_indexes) {
@@ -443,12 +444,13 @@ Status
IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta
index_file_writer = std::make_unique<IndexFileWriter>(
fs, index_path_prefix,
output_rowset_meta->rowset_id().to_string(),
seg_ptr->id(),
output_rowset_schema->get_inverted_index_storage_format(),
- std::move(file_writer));
+ std::move(file_writer), true /* can_use_ram_dir */,
_tablet->tablet_id());
RETURN_IF_ERROR(index_file_writer->initialize(dirs));
} else {
index_file_writer = std::make_unique<IndexFileWriter>(
fs, index_path_prefix,
output_rowset_meta->rowset_id().to_string(),
- seg_ptr->id(),
output_rowset_schema->get_inverted_index_storage_format());
+ seg_ptr->id(),
output_rowset_schema->get_inverted_index_storage_format(),
+ nullptr, true /* can_use_ram_dir */,
_tablet->tablet_id());
}
// create inverted index writer, or ann index writer
for (auto inverted_index : _alter_inverted_indexes) {
diff --git a/be/test/io/cache/block_file_cache_test.cpp
b/be/test/io/cache/block_file_cache_test.cpp
index c2fe61d5d36..cf417b4f347 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -3334,6 +3334,7 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader) {
io::FileReaderOptions opts;
opts.cache_type = io::cache_type_from_string("file_block_cache");
opts.is_doris_table = true;
+ opts.tablet_id = 10086;
CachedRemoteFileReader reader(local_reader, opts);
auto key = io::BlockFileCache::hash("tmp_file");
EXPECT_EQ(reader._cache_hash, key);
@@ -3447,6 +3448,7 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_tail) {
io::FileReaderOptions opts;
opts.cache_type = io::cache_type_from_string("file_block_cache");
opts.is_doris_table = true;
+ opts.tablet_id = 10086;
CachedRemoteFileReader reader(local_reader, opts);
{
std::string buffer;
@@ -3520,6 +3522,7 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_error_handle) {
io::FileReaderOptions opts;
opts.cache_type = io::cache_type_from_string("file_block_cache");
opts.is_doris_table = true;
+ opts.tablet_id = 10086;
CachedRemoteFileReader reader(local_reader, opts);
auto sp = SyncPoint::get_instance();
sp->enable_processing();
@@ -3605,6 +3608,7 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_self_heal_on_downloaded_not
io::FileReaderOptions opts;
opts.cache_type = io::cache_type_from_string("file_block_cache");
opts.is_doris_table = true;
+ opts.tablet_id = 10086;
CachedRemoteFileReader reader(local_reader, opts);
uint64_t before_self_heal =
g_read_cache_self_heal_on_not_found.get_value();
@@ -3701,6 +3705,7 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_no_self_heal_on_non_not_fou
io::FileReaderOptions opts;
opts.cache_type = io::cache_type_from_string("file_block_cache");
opts.is_doris_table = true;
+ opts.tablet_id = 10086;
CachedRemoteFileReader reader(local_reader, opts);
std::string buffer(64_kb, '\0');
@@ -3849,6 +3854,7 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_concurrent) {
io::FileReaderOptions opts;
opts.cache_type = io::cache_type_from_string("file_block_cache");
opts.is_doris_table = true;
+ opts.tablet_id = 10086;
bool flag1 = false;
auto reader = std::make_shared<CachedRemoteFileReader>(local_reader, opts);
auto sp = SyncPoint::get_instance();
@@ -3933,6 +3939,7 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_concurrent_2) {
io::FileReaderOptions opts;
opts.cache_type = io::cache_type_from_string("file_block_cache");
opts.is_doris_table = true;
+ opts.tablet_id = 10086;
auto reader = std::make_shared<CachedRemoteFileReader>(local_reader, opts);
auto sp = SyncPoint::get_instance();
sp->enable_processing();
@@ -4455,6 +4462,7 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_opt_lock) {
io::FileReaderOptions opts;
opts.cache_type = FileCachePolicy::FILE_BLOCK_CACHE;
opts.is_doris_table = true;
+ opts.tablet_id = 10086;
{
FileReaderSPtr local_reader;
ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file,
&local_reader).ok());
@@ -7055,6 +7063,7 @@ TEST_F(BlockFileCacheTest,
reader_dryrun_when_download_file_cache) {
io::FileReaderOptions opts;
opts.cache_type = io::cache_type_from_string("file_block_cache");
opts.is_doris_table = true;
+ opts.tablet_id = 10086;
CachedRemoteFileReader reader(local_reader, opts);
auto key = io::BlockFileCache::hash("tmp_file");
EXPECT_EQ(reader._cache_hash, key);
@@ -7550,6 +7559,7 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_ttl_index) {
io::FileReaderOptions opts;
opts.cache_type = io::cache_type_from_string("file_block_cache");
opts.is_doris_table = true;
+ opts.tablet_id = 10086;
CachedRemoteFileReader reader(local_reader, opts);
auto key = io::BlockFileCache::hash("tmp_file");
EXPECT_EQ(reader._cache_hash, key);
@@ -7631,6 +7641,7 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_normal_index) {
io::FileReaderOptions opts;
opts.cache_type = io::cache_type_from_string("file_block_cache");
opts.is_doris_table = true;
+ opts.tablet_id = 10086;
CachedRemoteFileReader reader(local_reader, opts);
auto key = io::BlockFileCache::hash("tmp_file");
EXPECT_EQ(reader._cache_hash, key);
@@ -7786,6 +7797,7 @@ TEST_F(BlockFileCacheTest,
DISABLE_cached_remote_file_reader_direct_read_and_evi
io::FileReaderOptions opts;
opts.cache_type = io::cache_type_from_string("file_block_cache");
opts.is_doris_table = true;
+ opts.tablet_id = 10086;
auto reader = std::make_shared<CachedRemoteFileReader>(local_reader, opts);
std::string buffer;
@@ -7878,6 +7890,7 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_direct_read_bytes_check) {
io::FileReaderOptions opts;
opts.cache_type = io::cache_type_from_string("file_block_cache");
opts.is_doris_table = true;
+ opts.tablet_id = 10086;
auto reader = std::make_shared<CachedRemoteFileReader>(local_reader, opts);
std::string buffer;
diff --git a/be/test/io/cache/block_file_cache_test_lru_dump.cpp
b/be/test/io/cache/block_file_cache_test_lru_dump.cpp
index e8a373568ef..a5d06b5abbc 100644
--- a/be/test/io/cache/block_file_cache_test_lru_dump.cpp
+++ b/be/test/io/cache/block_file_cache_test_lru_dump.cpp
@@ -528,6 +528,7 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_direct_read_order_check) {
io::FileReaderOptions opts;
opts.cache_type = io::cache_type_from_string("file_block_cache");
opts.is_doris_table = true;
+ opts.tablet_id = 10086;
auto reader = std::make_shared<CachedRemoteFileReader>(local_reader, opts);
std::string buffer;
diff --git a/be/test/io/fs/packed_file_concurrency_test.cpp
b/be/test/io/fs/packed_file_concurrency_test.cpp
index 1e581ff59b8..14c41d53a7b 100644
--- a/be/test/io/fs/packed_file_concurrency_test.cpp
+++ b/be/test/io/fs/packed_file_concurrency_test.cpp
@@ -456,6 +456,7 @@ protected:
FileReaderOptions local_opts = opts ? *opts : FileReaderOptions();
local_opts.cache_type = FileCachePolicy::FILE_BLOCK_CACHE;
local_opts.is_doris_table = true;
+ local_opts.tablet_id = 10086;
*reader = std::make_shared<CachedRemoteFileReader>(raw, local_opts);
return Status::OK();
}
@@ -722,6 +723,7 @@ TEST_F(MergeFileConcurrencyTest,
ConcurrentWriteReadCorrectness) {
FileReaderOptions opts;
opts.cache_type = FileCachePolicy::FILE_BLOCK_CACHE;
opts.is_doris_table = true;
+ opts.tablet_id = 10086;
ASSERT_TRUE(reader_fs.open_file(Path(path), &reader,
&opts).ok());
// After the fix, CachedRemoteFileReader wraps
PackedFileReader (not vice versa)
// This ensures cache key uses segment path for proper cleanup
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]