This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch cs_opt_version-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/cs_opt_version-3.1 by this
push:
new 824ac04e9f2 [opt](cloud) Support prefetch parallel for single file
(#57524)
824ac04e9f2 is described below
commit 824ac04e9f2d5052f7b7b182f63917190fe50724
Author: Gavin Chou <[email protected]>
AuthorDate: Tue Nov 4 15:13:08 2025 +0800
[opt](cloud) Support prefetch parallel for single file (#57524)
---
be/src/common/config.cpp | 7 +++
be/src/common/config.h | 2 +
be/src/io/cache/cached_remote_file_reader.cpp | 79 ++++++++++++++++++++++-----
be/src/io/cache/cached_remote_file_reader.h | 4 ++
be/src/olap/like_column_predicate.h | 7 +--
5 files changed, 82 insertions(+), 17 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 243cd65351a..d19f6381028 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1143,6 +1143,13 @@
DEFINE_mInt64(file_cache_background_lru_dump_interval_ms, "60000");
DEFINE_mInt64(file_cache_background_lru_dump_update_cnt_threshold, "1000");
DEFINE_mInt64(file_cache_background_lru_dump_tail_record_num, "5000000");
DEFINE_mInt64(file_cache_background_lru_log_replay_interval_ms, "1000");
+// number of prefetch parallel when the read is missed
+DEFINE_mInt32(file_cache_num_parallel_prefetch, "0");
+// if we read the tail data less than `threshold` we extend this read with
extra
+// block of data, e.g. by default, if the read is the tail 10KB, the actual IO
is
+// config::file_cache_each_block_size + 10KB
+// if tail read is 101KB, the actual IO is 101KB
+DEFINE_mInt64(file_cache_tail_read_extra_bytes_threshold, "102400");
DEFINE_mBool(enable_evaluate_shadow_queue_diff, "false");
DEFINE_Int32(file_cache_downloader_thread_num_min, "32");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 55591fc36c7..a17768fae6e 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1180,6 +1180,8 @@
DECLARE_mInt64(file_cache_background_lru_dump_interval_ms);
DECLARE_mInt64(file_cache_background_lru_dump_update_cnt_threshold);
DECLARE_mInt64(file_cache_background_lru_dump_tail_record_num);
DECLARE_mInt64(file_cache_background_lru_log_replay_interval_ms);
+DECLARE_mInt32(file_cache_num_parallel_prefetch);
+DECLARE_mInt64(file_cache_tail_read_extra_bytes_threshold);
DECLARE_mBool(enable_evaluate_shadow_queue_diff);
// inverted index searcher cache
diff --git a/be/src/io/cache/cached_remote_file_reader.cpp
b/be/src/io/cache/cached_remote_file_reader.cpp
index b89bdcf2f6d..429a1f625f9 100644
--- a/be/src/io/cache/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/cached_remote_file_reader.cpp
@@ -36,9 +36,11 @@
#include "io/fs/file_reader.h"
#include "io/fs/local_file_system.h"
#include "io/io_common.h"
+#include "runtime/exec_env.h"
#include "util/bit_util.h"
#include "util/doris_metrics.h"
#include "util/runtime_profile.h"
+#include "vec/exec/scan/scanner_scheduler.h"
namespace doris::io {
@@ -47,6 +49,7 @@ bvar::LatencyRecorder
g_skip_cache_num("cached_remote_reader_skip_cache_num");
bvar::Adder<uint64_t> g_skip_cache_sum("cached_remote_reader_skip_cache_sum");
bvar::Adder<uint64_t> g_skip_local_cache_io_sum_bytes(
"cached_remote_reader_skip_local_cache_io_sum_bytes");
+bvar::LatencyRecorder
g_read_at_req_bytes("cached_remote_reader_read_at_req_bytes");
CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr
remote_file_reader,
const FileReaderOptions& opts)
@@ -86,6 +89,10 @@ void
CachedRemoteFileReader::_insert_file_reader(FileBlockSPtr file_block) {
}
CachedRemoteFileReader::~CachedRemoteFileReader() {
+ {
+ std::unique_lock l(_parallel_mtx);
+ _parallel_cv.wait(l, [this] { return _parallel_ref == 0; });
+ }
static_cast<void>(close());
}
@@ -94,24 +101,41 @@ Status CachedRemoteFileReader::close() {
}
std::pair<size_t, size_t> CachedRemoteFileReader::s_align_size(size_t offset,
size_t read_size,
- size_t length) {
- size_t left = offset;
- size_t right = offset + read_size - 1;
- size_t align_left =
- (left / config::file_cache_each_block_size) *
config::file_cache_each_block_size;
- size_t align_right =
- (right / config::file_cache_each_block_size + 1) *
config::file_cache_each_block_size;
- align_right = align_right < length ? align_right : length;
- size_t align_size = align_right - align_left;
- if (align_size < config::file_cache_each_block_size && align_left != 0) {
- align_size += config::file_cache_each_block_size;
- align_left -= config::file_cache_each_block_size;
+ size_t
file_length) {
+ const static size_t block_size = config::file_cache_each_block_size;
+
+ // Calculate the original read range [start, end)
+ size_t start_pos = offset;
+ size_t end_pos = offset + read_size;
+
+ // Align start position to the previous block boundary
+ size_t aligned_start = (start_pos / block_size) * block_size;
+
+ // Align end position to the next block boundary
+ size_t aligned_end = ((end_pos - 1) / block_size + 1) * block_size;
+
+ // Ensure we don't exceed file boundaries
+ aligned_end = std::min(aligned_end, file_length);
+
+ size_t aligned_size = aligned_end - aligned_start;
+
+ if (aligned_size > config::file_cache_tail_read_extra_bytes_threshold) {
+ return {aligned_start, aligned_size};
}
- return std::make_pair(align_left, align_size);
+
+ // Special case: if aligned size is smaller than a block and we're not at
file start,
+ // extend backwards to include a full block
+ if (aligned_size < block_size && aligned_start > 0) {
+ aligned_start -= block_size;
+ aligned_size += block_size;
+ }
+
+ return {aligned_start, aligned_size};
}
Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result,
size_t* bytes_read,
const IOContext* io_ctx) {
+ g_read_at_req_bytes << result.size;
const bool is_dryrun = io_ctx->is_dryrun;
DCHECK(!closed());
DCHECK(io_ctx);
@@ -183,6 +207,35 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
}
// read from cache or remote
auto [align_left, align_size] = s_align_size(offset, bytes_req, size());
+ if (config::file_cache_num_parallel_prefetch > 0 && !is_dryrun &&
_is_doris_table) {
+ auto off = align_left + align_size;
+ if (off < _remote_file_reader->size()) { // there may be more to read
+ auto ioctx = *io_ctx;
+ ioctx.is_dryrun = true;
+ ioctx.query_id = nullptr;
+ ioctx.file_cache_stats = nullptr;
+ ioctx.file_reader_stats = nullptr;
+ auto pool =
ExecEnv::GetInstance()->scanner_scheduler()->get_remote_scan_thread_pool();
+ {
+ std::unique_lock l(_parallel_mtx);
+ _parallel_ref++;
+ }
+ auto st = pool->submit_scan_task(vectorized::SimplifiedScanTask(
+ [ioctx, off, this] {
+ size_t bytesread;
+ Slice r((char*)0x0,
config::file_cache_each_block_size);
+ (void)read_at_impl(off, r, &bytesread, &ioctx);
+ std::unique_lock l(_parallel_mtx);
+ _parallel_ref--;
+ _parallel_cv.notify_one();
+ },
+ nullptr));
+ if (!st.ok()) {
+ std::unique_lock l(_parallel_mtx);
+ _parallel_ref--;
+ }
+ }
+ }
CacheContext cache_context(io_ctx);
cache_context.stats = &stats;
MonotonicStopWatch sw;
diff --git a/be/src/io/cache/cached_remote_file_reader.h
b/be/src/io/cache/cached_remote_file_reader.h
index 94e8a5807ba..04d8cc69af8 100644
--- a/be/src/io/cache/cached_remote_file_reader.h
+++ b/be/src/io/cache/cached_remote_file_reader.h
@@ -69,6 +69,10 @@ private:
void _update_stats(const ReadStatistics& stats, FileCacheStatistics* state,
bool is_inverted_index) const;
+
+ std::atomic<int> _parallel_ref {0};
+ std::mutex _parallel_mtx;
+ std::condition_variable _parallel_cv;
};
} // namespace doris::io
diff --git a/be/src/olap/like_column_predicate.h
b/be/src/olap/like_column_predicate.h
index de5ed6767b4..52ace5807b2 100644
--- a/be/src/olap/like_column_predicate.h
+++ b/be/src/olap/like_column_predicate.h
@@ -149,8 +149,7 @@ private:
const vectorized::ColumnDictI32& column) const {
std::vector<bool>* res = nullptr;
if (_segment_id_to_cached_res_flags.if_contains(
- column.get_rowset_segment_id(),
- [&res](auto& pair) { res = &pair.second; })) {
+ column.get_rowset_segment_id(), [&res](auto& pair) { res =
&pair.second; })) {
return res;
}
@@ -175,8 +174,8 @@ private:
std::pair {column.get_rowset_segment_id(), tmp_res});
}
- _segment_id_to_cached_res_flags.if_contains(
- column.get_rowset_segment_id(), [&res](auto& pair) { res =
&pair.second; });
+
_segment_id_to_cached_res_flags.if_contains(column.get_rowset_segment_id(),
+ [&res](auto& pair) { res =
&pair.second; });
return res;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]