This is an automated email from the ASF dual-hosted git repository.
eldenmoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 82b2eab76b0 branch-4.1: [fix](file cache) guard null IOContext in
cached remote reader #63842 (#63869)
82b2eab76b0 is described below
commit 82b2eab76b06d10936a8f91daac3a8d162aef766
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri May 29 15:19:28 2026 +0800
branch-4.1: [fix](file cache) guard null IOContext in cached remote reader
#63842 (#63869)
Cherry-picked from #63842
Co-authored-by: lihangyu <[email protected]>
---
be/src/format/native/native_reader.cpp | 11 +++---
be/src/io/cache/cached_remote_file_reader.cpp | 6 ++-
be/test/io/cache/block_file_cache_test.cpp | 54 +++++++++++++++++++++++++++
3 files changed, 65 insertions(+), 6 deletions(-)
diff --git a/be/src/format/native/native_reader.cpp
b/be/src/format/native/native_reader.cpp
index 8693a3e9a22..0cb0c1ca72a 100644
--- a/be/src/format/native/native_reader.cpp
+++ b/be/src/format/native/native_reader.cpp
@@ -47,7 +47,8 @@ NativeReader::~NativeReader() {
namespace {
Status validate_and_consume_header(io::FileReaderSPtr file_reader, const
TFileRangeDesc& range,
- int64_t* file_size, int64_t*
current_offset, bool* eof) {
+ int64_t* file_size, int64_t*
current_offset, bool* eof,
+ const io::IOContext* io_ctx) {
*file_size = file_reader->size();
*current_offset = 0;
*eof = (*file_size == 0);
@@ -65,7 +66,7 @@ Status validate_and_consume_header(io::FileReaderSPtr
file_reader, const TFileRa
char header[HEADER_SIZE];
Slice header_slice(header, sizeof(header));
size_t bytes_read = 0;
- RETURN_IF_ERROR(file_reader->read_at(0, header_slice, &bytes_read));
+ RETURN_IF_ERROR(file_reader->read_at(0, header_slice, &bytes_read,
io_ctx));
if (bytes_read != sizeof(header)) {
return Status::InternalError(
"failed to read Doris Native header from file {}, expect {}
bytes, got {} bytes",
@@ -142,7 +143,7 @@ Status NativeReader::init_reader() {
}
RETURN_IF_ERROR(validate_and_consume_header(_file_reader, _scan_range,
&_file_size,
- &_current_offset, &_eof));
+ &_current_offset, &_eof,
_io_ctx));
return Status::OK();
}
@@ -314,7 +315,7 @@ Status NativeReader::_read_next_pblock(std::string* buff,
bool* eof) {
uint64_t len = 0;
Slice len_slice(reinterpret_cast<char*>(&len), sizeof(len));
size_t bytes_read = 0;
- RETURN_IF_ERROR(_file_reader->read_at(_current_offset, len_slice,
&bytes_read));
+ RETURN_IF_ERROR(_file_reader->read_at(_current_offset, len_slice,
&bytes_read, _io_ctx));
if (bytes_read == 0) {
*eof = true;
return Status::OK();
@@ -336,7 +337,7 @@ Status NativeReader::_read_next_pblock(std::string* buff,
bool* eof) {
buff->assign(len, '\0');
Slice data_slice(buff->data(), len);
bytes_read = 0;
- RETURN_IF_ERROR(_file_reader->read_at(_current_offset, data_slice,
&bytes_read));
+ RETURN_IF_ERROR(_file_reader->read_at(_current_offset, data_slice,
&bytes_read, _io_ctx));
if (bytes_read != len) {
return Status::InternalError(
"Failed to read native block body from file {}, expect {}, "
diff --git a/be/src/io/cache/cached_remote_file_reader.cpp
b/be/src/io/cache/cached_remote_file_reader.cpp
index da4758e8b99..5cdb522cfcf 100644
--- a/be/src/io/cache/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/cached_remote_file_reader.cpp
@@ -288,9 +288,13 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
size_t already_read = 0;
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().cached_remote_reader_read_at);
+ const IOContext default_io_ctx;
+ if (io_ctx == nullptr) {
+ io_ctx = &default_io_ctx;
+ }
+ DCHECK(io_ctx);
const bool is_dryrun = io_ctx->is_dryrun;
DCHECK(!closed());
- DCHECK(io_ctx);
if (offset > size()) {
return Status::InvalidArgument(
fmt::format("offset exceeds file size(offset: {}, file size:
{}, path: {})", offset,
diff --git a/be/test/io/cache/block_file_cache_test.cpp
b/be/test/io/cache/block_file_cache_test.cpp
index d4ae09df5b7..09bf1afc8fb 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -3414,6 +3414,60 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader) {
FileCacheFactory::instance()->_capacity = 0;
}
+TEST_F(BlockFileCacheTest, cached_remote_file_reader_accepts_null_io_context) {
+ std::string cache_base_path =
+ caches_dir / "cached_remote_file_reader_accepts_null_io_context" /
"";
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+
+ io::FileCacheSettings settings;
+ settings.query_queue_size = 6291456;
+ settings.query_queue_elements = 6;
+ settings.index_queue_size = 1048576;
+ settings.index_queue_elements = 1;
+ settings.disposable_queue_size = 1048576;
+ settings.disposable_queue_elements = 1;
+ settings.capacity = 8388608;
+ settings.max_file_block_size = 1048576;
+ settings.max_query_cache_size = 0;
+
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path,
settings).ok());
+
+ auto cache = FileCacheFactory::instance()->_path_to_cache[cache_base_path];
+ for (int i = 0; i < 100; ++i) {
+ if (cache->get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+
+ FileReaderSPtr local_reader;
+ ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
+
+ io::FileReaderOptions opts;
+ opts.cache_type = io::cache_type_from_string("file_block_cache");
+ opts.is_doris_table = true;
+ opts.tablet_id = 10086;
+ CachedRemoteFileReader reader(local_reader, opts);
+
+ std::string buffer(64_kb, '\0');
+ size_t bytes_read {0};
+ ASSERT_TRUE(reader.read_at(0, Slice(buffer.data(), buffer.size()),
&bytes_read).ok());
+ EXPECT_EQ(bytes_read, 64_kb);
+ EXPECT_EQ(std::string(64_kb, '0'), buffer);
+
+ EXPECT_TRUE(reader.close().ok());
+ EXPECT_TRUE(reader.closed());
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ FileCacheFactory::instance()->_caches.clear();
+ FileCacheFactory::instance()->_path_to_cache.clear();
+ FileCacheFactory::instance()->_capacity = 0;
+}
+
TEST_F(BlockFileCacheTest, cached_remote_file_reader_tail) {
std::string cache_base_path = caches_dir /
"cached_remote_file_reader_tail" / "";
if (fs::exists(cache_base_path)) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]