This is an automated email from the ASF dual-hosted git repository.

eldenmoon pushed a commit to branch cs_opt_version-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/cs_opt_version-3.1 by this 
push:
     new 1f759292c70 cs_opt_3.1: [Opt](cloud) Add segment prefetcher (#58929)
1f759292c70 is described below

commit 1f759292c705499710aff7dae5ed6c79f13749d3
Author: bobhan1 <[email protected]>
AuthorDate: Fri Dec 12 14:36:54 2025 +0800

    cs_opt_3.1: [Opt](cloud) Add segment prefetcher (#58929)
---
 be/src/common/config.cpp                           |  23 ++-
 be/src/common/config.h                             |  19 +++
 be/src/io/cache/block_file_cache.cpp               |   3 +
 be/src/io/cache/cached_remote_file_reader.cpp      |  92 +++++++++--
 be/src/io/cache/cached_remote_file_reader.h        |  15 +-
 be/src/io/fs/s3_file_reader.cpp                    |   3 +
 be/src/olap/olap_common.h                          |   1 +
 be/src/olap/rowset/segment_v2/column_reader.cpp    |  45 ++++++
 be/src/olap/rowset/segment_v2/column_reader.h      |  19 +++
 be/src/olap/rowset/segment_v2/ordinal_page_index.h |   1 +
 be/src/olap/rowset/segment_v2/page_io.cpp          |   4 +
 be/src/olap/rowset/segment_v2/segment_iterator.cpp | 103 +++++++++++-
 be/src/olap/rowset/segment_v2/segment_iterator.h   |   2 +
 .../olap/rowset/segment_v2/segment_prefetcher.cpp  | 180 +++++++++++++++++++++
 be/src/olap/rowset/segment_v2/segment_prefetcher.h | 176 ++++++++++++++++++++
 be/src/pipeline/exec/olap_scan_operator.cpp        |   2 +
 be/src/pipeline/exec/olap_scan_operator.h          |   1 +
 be/src/runtime/exec_env.h                          |   3 +
 be/src/runtime/exec_env_init.cpp                   |   7 +
 be/src/service/doris_main.cpp                      |   4 +
 be/src/util/concurrency_stats.cpp                  | 131 +++++++++++++++
 be/src/util/concurrency_stats.h                    | 123 ++++++++++++++
 be/src/vec/exec/scan/new_olap_scanner.cpp          |   2 +
 be/src/vec/exec/scan/vscanner.cpp                  |   2 +
 24 files changed, 942 insertions(+), 19 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 9e8f500908b..49adb6ec70d 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -386,7 +386,7 @@ DEFINE_mBool(enable_segment_rows_consistency_check, 
"false");
 DEFINE_mBool(enable_segment_rows_check_core, "false");
 // ATTENTION: For test only. In test environment, there are no historical data,
 // so all rowset meta should have segment rows info.
-DEFINE_mBool(fail_when_segment_rows_not_in_rowset_meta,"false");
+DEFINE_mBool(fail_when_segment_rows_not_in_rowset_meta, "false");
 DEFINE_String(row_cache_mem_limit, "20%");
 
 // Cache for storage page size
@@ -1458,6 +1458,21 @@ DEFINE_mInt64(string_overflow_size, "4294967295"); // 
std::numic_limits<uint32_t
 DEFINE_Int64(num_buffered_reader_prefetch_thread_pool_min_thread, "16");
 // The max thread num for BufferedReaderPrefetchThreadPool
 DEFINE_Int64(num_buffered_reader_prefetch_thread_pool_max_thread, "64");
+
+DEFINE_mBool(enable_segment_prefetch_verbose_log, "false");
+// The thread num for SegmentPrefetchThreadPool
+DEFINE_Int64(segment_prefetch_thread_pool_thread_num_min, "32");
+DEFINE_Int64(segment_prefetch_thread_pool_thread_num_max, "2000");
+
+DEFINE_mInt32(segment_file_cache_consume_rowids_batch_size, "8000");
+// Enable segment file cache block prefetch for query
+DEFINE_mBool(enable_query_segment_file_cache_prefetch, "false");
+// Number of blocks to prefetch ahead in segment iterator for query
+DEFINE_mInt32(query_segment_file_cache_prefetch_block_size, "2");
+// Enable segment file cache block prefetch for compaction
+DEFINE_mBool(enable_compaction_segment_file_cache_prefetch, "false");
+// Number of blocks to prefetch ahead in segment iterator for compaction
+DEFINE_mInt32(compaction_segment_file_cache_prefetch_block_size, "2");
 // The min thread num for S3FileUploadThreadPool
 DEFINE_Int64(num_s3_file_upload_thread_pool_min_thread, "16");
 // The max thread num for S3FileUploadThreadPool
@@ -1580,6 +1595,12 @@ DEFINE_mBool(enable_wal_tde, "false");
 DEFINE_mBool(enable_prefill_output_dbm_agg_cache_after_compaction, "true");
 DEFINE_mBool(enable_prefill_all_dbm_agg_cache_after_compaction, "true");
 
+// Concurrency stats dump configuration
+DEFINE_mBool(enable_concurrency_stats_dump, "false");
+DEFINE_mInt32(concurrency_stats_dump_interval_ms, "100");
+DEFINE_Validator(concurrency_stats_dump_interval_ms,
+                 [](const int32_t config) -> bool { return config >= 10; });
+
 // clang-format off
 #ifdef BE_TEST
 // test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 367999fdf49..1a487d78da6 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1516,6 +1516,21 @@ DECLARE_mInt64(string_overflow_size);
 DECLARE_Int64(num_buffered_reader_prefetch_thread_pool_min_thread);
 // The max thread num for BufferedReaderPrefetchThreadPool
 DECLARE_Int64(num_buffered_reader_prefetch_thread_pool_max_thread);
+
+DECLARE_mBool(enable_segment_prefetch_verbose_log);
+// The thread num for SegmentPrefetchThreadPool
+DECLARE_Int64(segment_prefetch_thread_pool_thread_num_min);
+DECLARE_Int64(segment_prefetch_thread_pool_thread_num_max);
+
+DECLARE_mInt32(segment_file_cache_consume_rowids_batch_size);
+// Enable segment file cache block prefetch for query
+DECLARE_mBool(enable_query_segment_file_cache_prefetch);
+// Number of blocks to prefetch ahead in segment iterator for query
+DECLARE_mInt32(query_segment_file_cache_prefetch_block_size);
+// Enable segment file cache block prefetch for compaction
+DECLARE_mBool(enable_compaction_segment_file_cache_prefetch);
+// Number of blocks to prefetch ahead in segment iterator for compaction
+DECLARE_mInt32(compaction_segment_file_cache_prefetch_block_size);
 // The min thread num for S3FileUploadThreadPool
 DECLARE_Int64(num_s3_file_upload_thread_pool_min_thread);
 // The max thread num for S3FileUploadThreadPool
@@ -1641,6 +1656,10 @@ DECLARE_mBool(enable_wal_tde);
 DECLARE_mBool(enable_prefill_output_dbm_agg_cache_after_compaction);
 DECLARE_mBool(enable_prefill_all_dbm_agg_cache_after_compaction);
 
+// Concurrency stats dump configuration
+DECLARE_mBool(enable_concurrency_stats_dump);
+DECLARE_mInt32(concurrency_stats_dump_interval_ms);
+
 #ifdef BE_TEST
 // test s3
 DECLARE_String(test_s3_resource);
diff --git a/be/src/io/cache/block_file_cache.cpp 
b/be/src/io/cache/block_file_cache.cpp
index d69c3e3a07f..d219c25cb19 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -45,6 +45,7 @@
 #include "io/cache/file_cache_common.h"
 #include "io/cache/fs_file_cache_storage.h"
 #include "io/cache/mem_file_cache_storage.h"
+#include "util/concurrency_stats.h"
 #include "util/runtime_profile.h"
 #include "util/stopwatch.hpp"
 #include "util/thread.h"
@@ -736,7 +737,9 @@ FileBlocksHolder BlockFileCache::get_or_set(const 
UInt128Wrapper& hash, size_t o
     DCHECK(stats != nullptr);
     MonotonicStopWatch sw;
     sw.start();
+    
ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->increment();
     std::lock_guard cache_lock(_mutex);
+    
ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->decrement();
     stats->lock_wait_timer += sw.elapsed_time();
     FileBlocks file_blocks;
     int64_t duration = 0;
diff --git a/be/src/io/cache/cached_remote_file_reader.cpp 
b/be/src/io/cache/cached_remote_file_reader.cpp
index 429a1f625f9..7c92d4cbeee 100644
--- a/be/src/io/cache/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/cached_remote_file_reader.cpp
@@ -38,6 +38,7 @@
 #include "io/io_common.h"
 #include "runtime/exec_env.h"
 #include "util/bit_util.h"
+#include "util/concurrency_stats.h"
 #include "util/doris_metrics.h"
 #include "util/runtime_profile.h"
 #include "vec/exec/scan/scanner_scheduler.h"
@@ -135,6 +136,8 @@ std::pair<size_t, size_t> 
CachedRemoteFileReader::s_align_size(size_t offset, si
 
 Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, 
size_t* bytes_read,
                                             const IOContext* io_ctx) {
+    
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().cached_remote_reader_read_at);
+
     g_read_at_req_bytes << result.size;
     const bool is_dryrun = io_ctx->is_dryrun;
     DCHECK(!closed());
@@ -240,8 +243,12 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, 
Slice result, size_t*
     cache_context.stats = &stats;
     MonotonicStopWatch sw;
     sw.start();
+
+    
ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set->increment();
     FileBlocksHolder holder =
             _cache->get_or_set(_cache_hash, align_left, align_size, 
cache_context);
+    
ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set->decrement();
+
     stats.cache_get_or_set_timer += sw.elapsed_time();
     std::vector<FileBlockSPtr> empty_blocks;
     for (auto& block : holder.file_blocks) {
@@ -279,23 +286,28 @@ Status CachedRemoteFileReader::read_at_impl(size_t 
offset, Slice result, size_t*
             RETURN_IF_ERROR(_remote_file_reader->read_at(empty_start, 
Slice(buffer.get(), size),
                                                          &size, io_ctx));
         }
-        for (auto& block : empty_blocks) {
-            if (block->state() == FileBlock::State::SKIP_CACHE) {
-                continue;
-            }
-            SCOPED_RAW_TIMER(&stats.local_write_timer);
-            char* cur_ptr = buffer.get() + block->range().left - empty_start;
-            size_t block_size = block->range().size();
-            Status st = block->append(Slice(cur_ptr, block_size));
-            if (st.ok()) {
-                st = block->finalize();
-            }
-            if (!st.ok()) {
-                LOG_EVERY_N(WARNING, 100) << "Write data to file cache failed. 
err=" << st.msg();
-            } else {
-                _insert_file_reader(block);
+        {
+            SCOPED_CONCURRENCY_COUNT(
+                    
ConcurrencyStatsManager::instance().cached_remote_reader_write_back);
+            for (auto& block : empty_blocks) {
+                if (block->state() == FileBlock::State::SKIP_CACHE) {
+                    continue;
+                }
+                SCOPED_RAW_TIMER(&stats.local_write_timer);
+                char* cur_ptr = buffer.get() + block->range().left - 
empty_start;
+                size_t block_size = block->range().size();
+                Status st = block->append(Slice(cur_ptr, block_size));
+                if (st.ok()) {
+                    st = block->finalize();
+                }
+                if (!st.ok()) {
+                    LOG_EVERY_N(WARNING, 100)
+                            << "Write data to file cache failed. err=" << 
st.msg();
+                } else {
+                    _insert_file_reader(block);
+                }
+                stats.bytes_write_into_file_cache += block_size;
             }
-            stats.bytes_write_into_file_cache += block_size;
         }
         // copy from memory directly
         size_t right_offset = offset + bytes_req - 1;
@@ -333,6 +345,8 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, 
Slice result, size_t*
         static int64_t max_wait_time = 10;
         TEST_SYNC_POINT_CALLBACK("CachedRemoteFileReader::max_wait_time", 
&max_wait_time);
         if (block_state != FileBlock::State::DOWNLOADED) {
+            SCOPED_CONCURRENCY_COUNT(
+                    
ConcurrencyStatsManager::instance().cached_remote_reader_blocking);
             do {
                 SCOPED_RAW_TIMER(&stats.remote_read_timer);
                 
TEST_SYNC_POINT_CALLBACK("CachedRemoteFileReader::DOWNLOADING");
@@ -358,6 +372,8 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, 
Slice result, size_t*
                 } else {
                     size_t file_offset = current_offset - left;
                     SCOPED_RAW_TIMER(&stats.local_read_timer);
+                    SCOPED_CONCURRENCY_COUNT(
+                            
ConcurrencyStatsManager::instance().cached_remote_reader_local_read);
                     st = block->read(Slice(result.data + (current_offset - 
offset), read_size),
                                      file_offset);
                 }
@@ -423,4 +439,48 @@ void CachedRemoteFileReader::_update_stats(const 
ReadStatistics& read_stats,
     g_skip_cache_sum << read_stats.skip_cache;
 }
 
+void CachedRemoteFileReader::prefetch_range(size_t offset, size_t size, const 
IOContext* io_ctx) {
+    if (offset >= this->size() || size == 0) {
+        return;
+    }
+
+    size = std::min(size, this->size() - offset);
+
+    ThreadPool* pool = ExecEnv::GetInstance()->segment_prefetch_thread_pool();
+    if (pool == nullptr) {
+        return;
+    }
+
+    IOContext dryrun_ctx;
+    if (io_ctx != nullptr) {
+        dryrun_ctx = *io_ctx;
+    }
+    dryrun_ctx.is_dryrun = true;
+    dryrun_ctx.query_id = nullptr;
+    dryrun_ctx.file_cache_stats = nullptr;
+    dryrun_ctx.file_reader_stats = nullptr;
+
+    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log)
+            << fmt::format("[verbose] Submitting prefetch task for offset={} 
size={}, file={}",
+                           offset, size, path().filename().native());
+    std::weak_ptr<CachedRemoteFileReader> weak_this = shared_from_this();
+    auto st = pool->submit_func([weak_this, offset, size, dryrun_ctx]() {
+        auto self = weak_this.lock();
+        if (self == nullptr) {
+            return;
+        }
+        size_t bytes_read;
+        Slice dummy_buffer((char*)nullptr, size);
+        (void)self->read_at_impl(offset, dummy_buffer, &bytes_read, 
&dryrun_ctx);
+        LOG_IF(INFO, config::enable_segment_prefetch_verbose_log)
+                << fmt::format("[verbose] Prefetch task completed for 
offset={} size={}, file={}",
+                               offset, size, self->path().filename().native());
+    });
+
+    if (!st.ok()) {
+        VLOG_DEBUG << "Failed to submit prefetch task for offset=" << offset 
<< " size=" << size
+                   << " error=" << st.to_string();
+    }
+}
+
 } // namespace doris::io
diff --git a/be/src/io/cache/cached_remote_file_reader.h 
b/be/src/io/cache/cached_remote_file_reader.h
index 04d8cc69af8..36233c68aa3 100644
--- a/be/src/io/cache/cached_remote_file_reader.h
+++ b/be/src/io/cache/cached_remote_file_reader.h
@@ -36,7 +36,8 @@ namespace doris::io {
 struct IOContext;
 struct FileCacheStatistics;
 
-class CachedRemoteFileReader final : public FileReader {
+class CachedRemoteFileReader final : public FileReader,
+                                     public 
std::enable_shared_from_this<CachedRemoteFileReader> {
 public:
     CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const 
FileReaderOptions& opts);
 
@@ -54,6 +55,18 @@ public:
 
     static std::pair<size_t, size_t> s_align_size(size_t offset, size_t size, 
size_t length);
 
+    // Asynchronously prefetch a range of file cache blocks.
+    // This method triggers read file cache in dryrun mode to warm up the cache
+    // without actually reading the data into user buffers.
+    //
+    // Parameters:
+    //   offset: Starting offset in the file
+    //   size: Number of bytes to prefetch
+    //   io_ctx: IO context (can be nullptr, will create a dryrun context 
internally)
+    //
+    // Note: This is a best-effort operation. Errors are logged but not 
returned.
+    void prefetch_range(size_t offset, size_t size, const IOContext* io_ctx = 
nullptr);
+
 protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp
index eede868468f..ff058c4df1b 100644
--- a/be/src/io/fs/s3_file_reader.cpp
+++ b/be/src/io/fs/s3_file_reader.cpp
@@ -38,6 +38,7 @@
 #include "runtime/thread_context.h"
 #include "runtime/workload_management/io_throttle.h"
 #include "util/bvar_helper.h"
+#include "util/concurrency_stats.h"
 #include "util/debug_points.h"
 #include "util/doris_metrics.h"
 #include "util/runtime_profile.h"
@@ -124,6 +125,8 @@ Status S3FileReader::read_at_impl(size_t offset, Slice 
result, size_t* bytes_rea
         return Status::InternalError("init s3 client error");
     }
 
+    
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().s3_file_reader_read);
+
     int retry_count = 0;
     const int base_wait_time = config::s3_read_base_wait_time_ms; // Base wait 
time in milliseconds
     const int max_wait_time = config::s3_read_max_wait_time_ms; // Maximum 
wait time in milliseconds
diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index e8cc1e02198..0f61008f5b0 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -416,6 +416,7 @@ struct OlapReaderStatistics {
     int64_t segment_iterator_init_return_column_iterators_timer_ns = 0;
     int64_t segment_iterator_init_bitmap_index_iterators_timer_ns = 0;
     int64_t segment_iterator_init_inverted_index_iterators_timer_ns = 0;
+    int64_t segment_iterator_init_segment_prefetchers_timer_ns = 0;
 
     int64_t segment_create_column_readers_timer_ns = 0;
     int64_t segment_load_index_timer_ns = 0;
diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp 
b/be/src/olap/rowset/segment_v2/column_reader.cpp
index 698b6730d98..143e416e54d 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -65,6 +65,7 @@
 #include "util/binary_cast.hpp"
 #include "util/bitmap.h"
 #include "util/block_compression.h"
+#include "util/concurrency_stats.h"
 #include "util/rle_encoding.h" // for RleDecoder
 #include "util/slice.h"
 #include "vec/columns/column.h"
@@ -363,6 +364,7 @@ Status ColumnReader::new_inverted_index_iterator(
 Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const 
PagePointer& pp,
                                PageHandle* handle, Slice* page_body, 
PageFooterPB* footer,
                                BlockCompressionCodec* codec) const {
+    
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().column_reader_read_page);
     iter_opts.sanity_check();
     PageReadOptions opts(iter_opts.io_ctx);
     opts.verify_checksum = _opts.verify_checksum;
@@ -726,6 +728,16 @@ Status ColumnReader::seek_at_or_before(ordinal_t ordinal, 
OrdinalPageIndexIterat
     return Status::OK();
 }
 
+Status ColumnReader::get_ordinal_index_reader(OrdinalIndexReader*& reader,
+                                              OlapReaderStatistics* 
index_load_stats) {
+    CHECK(_ordinal_index) << fmt::format("ordinal index is null for column 
reader of type {}",
+                                         std::to_string(int(_meta_type)));
+    RETURN_IF_ERROR(
+            _ordinal_index->load(_use_index_page_cache, _opts.kept_in_memory, 
index_load_stats));
+    reader = _ordinal_index.get();
+    return Status::OK();
+}
+
 Status ColumnReader::new_iterator(ColumnIteratorUPtr* iterator, const 
TabletColumn* tablet_column) {
     return new_iterator(iterator, tablet_column, nullptr);
 }
@@ -1328,7 +1340,22 @@ Status FileColumnIterator::seek_to_first() {
     return Status::OK();
 }
 
+void FileColumnIterator::_trigger_prefetch_if_eligible(ordinal_t ord) {
+    std::vector<BlockRange> ranges;
+    if (_prefetcher->need_prefetch(ord, &ranges)) {
+        for (const auto& range : ranges) {
+            _cached_remote_file_reader->prefetch_range(range.offset, 
range.size, &_opts.io_ctx);
+        }
+    }
+}
+
 Status FileColumnIterator::seek_to_ordinal(ordinal_t ord) {
+    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
+            "[verbose] FileColumnIterator::seek_to_ordinal seek to ordinal {}, 
enable_prefetch={}",
+            ord, _enable_prefetch);
+    if (_enable_prefetch) {
+        _trigger_prefetch_if_eligible(ord);
+    }
     // if current page contains this row, we don't need to seek
     if (!_page || !_page.contains(ord) || !_page_iter.valid()) {
         RETURN_IF_ERROR(_reader->seek_at_or_before(ord, &_page_iter, _opts));
@@ -1611,6 +1638,24 @@ Status FileColumnIterator::get_row_ranges_by_dict(const 
AndBlockColumnPredicate*
     return Status::OK();
 }
 
+Status FileColumnIterator::init_prefetcher(const SegmentPrefetchParams& 
params) {
+    if (_cached_remote_file_reader =
+                
std::dynamic_pointer_cast<io::CachedRemoteFileReader>(_reader->_file_reader);
+        !_cached_remote_file_reader) {
+        return Status::OK();
+    }
+    _enable_prefetch = true;
+    _prefetcher = std::make_unique<SegmentPrefetcher>(params.config);
+    RETURN_IF_ERROR(_prefetcher->init(params.row_bitmap, _reader, 
params.read_options));
+    return Status::OK();
+}
+
+void FileColumnIterator::collect_prefetchers(std::vector<SegmentPrefetcher*>& 
prefetchers) {
+    if (_prefetcher) {
+        prefetchers.emplace_back(_prefetcher.get());
+    }
+}
+
 Status DefaultValueColumnIterator::init(const ColumnIteratorOptions& opts) {
     _opts = opts;
     // be consistent with segment v1
diff --git a/be/src/olap/rowset/segment_v2/column_reader.h 
b/be/src/olap/rowset/segment_v2/column_reader.h
index 45a85c6210f..067fb4702de 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.h
+++ b/be/src/olap/rowset/segment_v2/column_reader.h
@@ -32,6 +32,7 @@
 #include "common/exception.h"
 #include "common/logging.h"
 #include "common/status.h" // for Status
+#include "io/cache/cached_remote_file_reader.h"
 #include "io/fs/file_reader_writer_fwd.h"
 #include "io/fs/file_system.h"
 #include "io/io_common.h"
@@ -41,6 +42,7 @@
 #include "olap/rowset/segment_v2/page_handle.h"        // for PageHandle
 #include "olap/rowset/segment_v2/page_pointer.h"
 #include "olap/rowset/segment_v2/parsed_page.h" // for ParsedPage
+#include "olap/rowset/segment_v2/segment_prefetcher.h"
 #include "olap/rowset/segment_v2/stream_reader.h"
 #include "olap/tablet_schema.h"
 #include "olap/types.h"
@@ -171,6 +173,8 @@ public:
     Status seek_to_first(OrdinalPageIndexIterator* iter, const 
ColumnIteratorOptions& iter_opts);
     Status seek_at_or_before(ordinal_t ordinal, OrdinalPageIndexIterator* iter,
                              const ColumnIteratorOptions& iter_opts);
+    Status get_ordinal_index_reader(OrdinalIndexReader*& reader,
+                                    OlapReaderStatistics* index_load_stats);
 
     // read a page from file into a page handle
     Status read_page(const ColumnIteratorOptions& iter_opts, const 
PagePointer& pp,
@@ -234,6 +238,8 @@ public:
 
 private:
     friend class VariantColumnReader;
+    friend class FileColumnIterator;
+    friend class SegmentPrefetcher;
 
     ColumnReader(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, 
uint64_t num_rows,
                  io::FileReaderSPtr file_reader);
@@ -375,6 +381,10 @@ public:
 
     virtual bool is_all_dict_encoding() const { return false; }
 
+    virtual Status init_prefetcher(const SegmentPrefetchParams& params) { 
return Status::OK(); }
+
+    virtual void collect_prefetchers(std::vector<SegmentPrefetcher*>& 
prefetchers) {}
+
 protected:
     ColumnIteratorOptions _opts;
 };
@@ -422,11 +432,16 @@ public:
 
     bool is_all_dict_encoding() const override { return _is_all_dict_encoding; 
}
 
+    Status init_prefetcher(const SegmentPrefetchParams& params) override;
+
+    void collect_prefetchers(std::vector<SegmentPrefetcher*>& prefetchers) 
override;
+
 private:
     void _seek_to_pos_in_page(ParsedPage* page, ordinal_t offset_in_page) 
const;
     Status _load_next_page(bool* eos);
     Status _read_data_page(const OrdinalPageIndexIterator& iter);
     Status _read_dict_data();
+    void _trigger_prefetch_if_eligible(ordinal_t ord);
 
     std::shared_ptr<ColumnReader> _reader = nullptr;
 
@@ -454,6 +469,10 @@ private:
     bool _is_all_dict_encoding = false;
 
     std::unique_ptr<StringRef[]> _dict_word_info;
+
+    bool _enable_prefetch {false};
+    std::unique_ptr<SegmentPrefetcher> _prefetcher;
+    std::shared_ptr<io::CachedRemoteFileReader> _cached_remote_file_reader 
{nullptr};
 };
 
 class EmptyFileColumnIterator final : public ColumnIterator {
diff --git a/be/src/olap/rowset/segment_v2/ordinal_page_index.h 
b/be/src/olap/rowset/segment_v2/ordinal_page_index.h
index df60edb12d1..f4b71b50a59 100644
--- a/be/src/olap/rowset/segment_v2/ordinal_page_index.h
+++ b/be/src/olap/rowset/segment_v2/ordinal_page_index.h
@@ -101,6 +101,7 @@ private:
 
 private:
     friend OrdinalPageIndexIterator;
+    friend class SegmentPrefetcher;
 
     io::FileReaderSPtr _file_reader;
     DorisCallOnce<Status> _load_once;
diff --git a/be/src/olap/rowset/segment_v2/page_io.cpp 
b/be/src/olap/rowset/segment_v2/page_io.cpp
index b5d70aed21b..0107a690a87 100644
--- a/be/src/olap/rowset/segment_v2/page_io.cpp
+++ b/be/src/olap/rowset/segment_v2/page_io.cpp
@@ -42,6 +42,7 @@
 #include "olap/rowset/segment_v2/page_handle.h"
 #include "util/block_compression.h"
 #include "util/coding.h"
+#include "util/concurrency_stats.h"
 #include "util/crc32c.h"
 #include "util/faststring.h"
 #include "util/runtime_profile.h"
@@ -203,6 +204,7 @@ Status PageIO::read_and_decompress_page_(const 
PageReadOptions& opts, PageHandle
                     "Bad page: page is compressed but codec is NO_COMPRESSION, 
file={}",
                     opts.file_reader->path().native());
         }
+        
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_decompress);
         SCOPED_RAW_TIMER(&opts.stats->decompress_ns);
         std::unique_ptr<DataPage> decompressed_page = 
std::make_unique<DataPage>(
                 footer->uncompressed_size() + footer_size + 4, 
opts.use_page_cache, opts.type);
@@ -228,6 +230,7 @@ Status PageIO::read_and_decompress_page_(const 
PageReadOptions& opts, PageHandle
     if (opts.pre_decode && opts.encoding_info) {
         auto* pre_decoder = opts.encoding_info->get_data_page_pre_decoder();
         if (pre_decoder) {
+            
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_pre_decode);
             RETURN_IF_ERROR(pre_decoder->decode(
                     &page, &page_slice, 
footer->data_page_footer().nullmap_size() + footer_size + 4,
                     opts.use_page_cache, opts.type));
@@ -241,6 +244,7 @@ Status PageIO::read_and_decompress_page_(const 
PageReadOptions& opts, PageHandle
     // just before add it to pagecache, it will be consistency with reading 
data from page cache.
     opts.stats->uncompressed_bytes_read += body->size;
     if (opts.use_page_cache && cache) {
+        
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_insert_page_cache);
         // insert this page into cache and return the cache handle
         cache->insert(cache_key, page.get(), &cache_handle, opts.type, 
opts.kept_in_memory);
         *handle = PageHandle(std::move(cache_handle));
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp 
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 5c5dd293916..96abe794c7a 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -31,6 +31,7 @@
 #include <utility>
 #include <vector>
 
+#include "cloud/config.h"
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/config.h"
 #include "common/consts.h"
@@ -38,6 +39,7 @@
 #include "common/logging.h"
 #include "common/object_pool.h"
 #include "common/status.h"
+#include "io/cache/cached_remote_file_reader.h"
 #include "io/fs/file_reader.h"
 #include "io/io_common.h"
 #include "olap/bloom_filter_predicate.h"
@@ -54,8 +56,10 @@
 #include "olap/rowset/segment_v2/indexed_column_reader.h"
 #include "olap/rowset/segment_v2/inverted_index_file_reader.h"
 #include "olap/rowset/segment_v2/inverted_index_reader.h"
+#include "olap/rowset/segment_v2/ordinal_page_index.h"
 #include "olap/rowset/segment_v2/row_ranges.h"
 #include "olap/rowset/segment_v2/segment.h"
+#include "olap/rowset/segment_v2/segment_prefetcher.h"
 #include "olap/rowset/segment_v2/variant/variant_column_reader.h"
 #include "olap/schema.h"
 #include "olap/short_key_index.h"
@@ -67,6 +71,7 @@
 #include "runtime/runtime_predicate.h"
 #include "runtime/runtime_state.h"
 #include "runtime/thread_context.h"
+#include "util/concurrency_stats.h"
 #include "util/defer_op.h"
 #include "util/doris_metrics.h"
 #include "util/key_util.h"
@@ -397,9 +402,99 @@ Status SegmentIterator::_lazy_init() {
     } else {
         _range_iter.reset(new BitmapRangeIterator(_row_bitmap));
     }
+
+    _init_segment_prefetchers();
+
     return Status::OK();
 }
 
+void SegmentIterator::_init_segment_prefetchers() {
+    
SCOPED_RAW_TIMER(&_opts.stats->segment_iterator_init_segment_prefetchers_timer_ns);
+    if (!config::is_cloud_mode()) {
+        return;
+    }
+    static std::vector<ReaderType> supported_reader_types {
+            ReaderType::READER_QUERY, ReaderType::READER_BASE_COMPACTION,
+            ReaderType::READER_CUMULATIVE_COMPACTION, 
ReaderType::READER_FULL_COMPACTION};
+    if (std::ranges::none_of(supported_reader_types,
+                             [&](ReaderType t) { return 
_opts.io_ctx.reader_type == t; })) {
+        return;
+    }
+    // Initialize segment prefetcher for predicate and non-predicate columns
+    bool is_query = (_opts.io_ctx.reader_type == ReaderType::READER_QUERY);
+    bool enable_prefetch = is_query ? 
config::enable_query_segment_file_cache_prefetch
+                                    : 
config::enable_compaction_segment_file_cache_prefetch;
+    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
+            "[verbose] SegmentIterator _init_segment_prefetchers, is_query={}, 
enable_prefetch={}, "
+            "_row_bitmap.isEmpty()={}, row_bitmap.cardinality()={}, tablet={}, 
rowset={}, "
+            "segment={}, predicate_column_ids={}, non_predicate_column_ids={}",
+            is_query, enable_prefetch, _row_bitmap.isEmpty(), 
_row_bitmap.cardinality(),
+            _opts.tablet_id, _opts.rowset_id.to_string(), segment_id(),
+            fmt::join(_predicate_column_ids, ","), 
fmt::join(_non_predicate_column_ids, ","));
+    if (enable_prefetch && !_row_bitmap.isEmpty()) {
+        int window_size =
+                1 + (is_query ? 
config::query_segment_file_cache_prefetch_block_size
+                              : 
config::compaction_segment_file_cache_prefetch_block_size);
+        LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << 
fmt::format(
+                "[verbose] SegmentIterator prefetch config: window_size={}", 
window_size);
+        if (window_size > 0 &&
+            !_column_iterators.empty()) { // ensure init_iterators has been 
called
+            SegmentPrefetcherConfig prefetch_config(window_size,
+                                                    
config::file_cache_each_block_size);
+            for (auto cid : _schema->column_ids()) {
+                auto& column_iter = _column_iterators[cid];
+                if (column_iter == nullptr) {
+                    continue;
+                }
+                const auto* tablet_column = _schema->column(cid);
+                SegmentPrefetchParams params {
+                        .config = prefetch_config,
+                        .row_bitmap = _row_bitmap,
+                        .read_options = _opts,
+                };
+                LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << 
fmt::format(
+                        "[verbose] SegmentIterator init_segment_prefetchers, "
+                        "tablet={}, rowset={}, segment={}, column_id={}, 
col_name={}, type={}",
+                        _opts.tablet_id, _opts.rowset_id.to_string(), 
segment_id(), cid,
+                        tablet_column->name(), tablet_column->type());
+                Status st = column_iter->init_prefetcher(params);
+                if (!st.ok()) {
+                    LOG_IF(WARNING, 
config::enable_segment_prefetch_verbose_log) << fmt::format(
+                            "[verbose] failed to init prefetcher for 
column_id={}, "
+                            "tablet={}, rowset={}, segment={}, error={}",
+                            cid, _opts.tablet_id, _opts.rowset_id.to_string(), 
segment_id(),
+                            st.to_string());
+                }
+            }
+
+            std::vector<SegmentPrefetcher*> prefetchers;
+            for (const auto& column_iter : _column_iterators) {
+                if (column_iter != nullptr) {
+                    column_iter->collect_prefetchers(prefetchers);
+                }
+            }
+
+            int batch_size = 
config::segment_file_cache_consume_rowids_batch_size;
+            std::vector<rowid_t> rowids(batch_size);
+            roaring::api::roaring_uint32_iterator_t iter;
+            roaring::api::roaring_init_iterator(&_row_bitmap.roaring, &iter);
+            uint32_t num =
+                    roaring::api::roaring_read_uint32_iterator(&iter, 
rowids.data(), batch_size);
+
+            for (; num > 0; num = 
roaring::api::roaring_read_uint32_iterator(&iter, rowids.data(),
+                                                                             
batch_size)) {
+                for (auto* prefetcher : prefetchers) {
+                    prefetcher->add_rowids(rowids.data(), num);
+                }
+            }
+
+            for (auto* prefetcher : prefetchers) {
+                prefetcher->finish_build_blocks();
+            }
+        }
+    }
+}
+
 Status SegmentIterator::_get_row_ranges_by_keys() {
     SCOPED_RAW_TIMER(&_opts.stats->generate_row_ranges_by_keys_ns);
     DorisMetrics::instance()->segment_row_total->increment(num_rows());
@@ -1699,7 +1794,11 @@ Status SegmentIterator::_read_columns_by_index(uint32_t 
nrows_read_limit, uint32
     nrows_read = _range_iter->read_batch_rowids(_block_rowids.data(), 
nrows_read_limit);
     bool is_continuous = (nrows_read > 1) &&
                          (_block_rowids[nrows_read - 1] - _block_rowids[0] == 
nrows_read - 1);
-
+    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
+            "[verbose] SegmentIterator::_read_columns_by_index read {} rowids, 
continuous: {}, "
+            "rowids: [{}...{}]",
+            nrows_read, is_continuous, nrows_read > 0 ? _block_rowids[0] : 0,
+            nrows_read > 0 ? _block_rowids[nrows_read - 1] : 0);
     for (auto cid : _predicate_column_ids) {
         auto& column = _current_return_columns[cid];
         if (_no_need_read_key_data(cid, column, nrows_read)) {
@@ -2033,6 +2132,8 @@ void SegmentIterator::_clear_iterators() {
 }
 
 Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {
+    
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().segment_iterator_next_batch);
+
     bool is_mem_reuse = block->mem_reuse();
     DCHECK(is_mem_reuse);
     // Clear the sparse column cache before processing a new batch
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h 
b/be/src/olap/rowset/segment_v2/segment_iterator.h
index fc9df6ca2ad..25bce36ae00 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.h
@@ -379,6 +379,8 @@ private:
 
     void _clear_iterators();
 
+    void _init_segment_prefetchers();
+
     class BitmapRangeIterator;
     class BackwardBitmapRangeIterator;
 
diff --git a/be/src/olap/rowset/segment_v2/segment_prefetcher.cpp 
b/be/src/olap/rowset/segment_v2/segment_prefetcher.cpp
new file mode 100644
index 00000000000..d6b90267a47
--- /dev/null
+++ b/be/src/olap/rowset/segment_v2/segment_prefetcher.cpp
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/rowset/segment_v2/segment_prefetcher.h"
+
+#include <algorithm>
+#include <ranges>
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "olap/iterators.h"
+#include "olap/rowset/segment_v2/column_reader.h"
+#include "olap/rowset/segment_v2/ordinal_page_index.h"
+
+namespace doris::segment_v2 {
+
+void SegmentPrefetcher::add_rowids(const rowid_t* rowids, uint32_t num) {
+    if (ordinal_index == nullptr) {
+        return;
+    }
+    const auto& ordinals = ordinal_index->_ordinals; // ordinals[i] = first 
ordinal of page i
+    const auto& pages = ordinal_index->_pages;       // pages[i] = page 
pointer of page i
+    const int num_pages = ordinal_index->_num_pages;
+    for (uint32_t i = 0; i < num; ++i) {
+        rowid_t rowid = rowids[i];
+
+        if (_is_forward) {
+            // Forward reading: iterate bitmap in ascending order using batch 
by batch
+            while (page_idx < num_pages - 1 && ordinals[page_idx + 1] <= 
rowid) {
+                page_idx++;
+            }
+
+            size_t block_id = _offset_to_block_id(pages[page_idx].offset);
+
+            if (block_id != last_block_id) {
+                if (last_block_id != static_cast<size_t>(-1)) {
+                    _block_sequence.emplace_back(last_block_id, 
current_block_first_rowid);
+                }
+                last_block_id = block_id;
+                current_block_first_rowid = rowid;
+            }
+        } else {
+            // Backward reading: we need the last rowid in each block as the 
"first" rowid
+            // (because when reading backwards, we encounter the largest rowid 
first)
+            //
+            // Strategy: iterate forward through bitmap, but for each block,
+            // keep updating current_block_first_rowid to the latest (largest) 
rowid in that block
+            while (page_idx < num_pages - 1 && ordinals[page_idx + 1] <= 
rowid) {
+                page_idx++;
+            }
+            size_t block_id = _offset_to_block_id(pages[page_idx].offset);
+
+            if (block_id != last_block_id) {
+                if (last_block_id != static_cast<size_t>(-1)) {
+                    _block_sequence.emplace_back(last_block_id, 
current_block_first_rowid);
+                }
+                last_block_id = block_id;
+            }
+            current_block_first_rowid = rowid;
+        }
+    }
+}
+
+void SegmentPrefetcher::finish_build_blocks() {
+    if (ordinal_index == nullptr) {
+        return;
+    }
+    if (last_block_id != static_cast<size_t>(-1)) {
+        _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
+    }
+
+    if (!_is_forward && !_block_sequence.empty()) {
+        std::ranges::reverse(_block_sequence);
+    }
+
+    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
+            "[verbose] SegmentPrefetcher initialized with block count={}, 
is_forward={}, "
+            "num_pages={}, path={}, blocks: (block_id, first_rowid)=[{}]",
+            _block_sequence.size(), _is_forward, ordinal_index->_num_pages, 
_path,
+            fmt::join(_block_sequence | std::views::transform([](const auto& 
b) {
+                          return fmt::format("({}, {})", b.block_id, 
b.first_rowid);
+                      }),
+                      ","));
+}
+
+Status SegmentPrefetcher::init(const roaring::Roaring& row_bitmap,
+                               std::shared_ptr<ColumnReader> column_reader,
+                               const StorageReadOptions& read_options) {
+    DCHECK(column_reader != nullptr);
+
+    _block_sequence.clear();
+    _current_block_index = 0;
+    _prefetched_index = -1;
+    _is_forward = !read_options.read_orderby_key_reverse;
+    _path = column_reader->_file_reader->path().filename().native();
+
+    if (row_bitmap.isEmpty()) {
+        return Status::OK();
+    }
+
+    RETURN_IF_ERROR(column_reader->get_ordinal_index_reader(ordinal_index, 
read_options.stats));
+
+    if (ordinal_index == nullptr) {
+        return Status::OK();
+    }
+
+    return Status::OK();
+}
+
+bool SegmentPrefetcher::need_prefetch(rowid_t current_rowid, 
std::vector<BlockRange>* out_ranges) {
+    DCHECK(out_ranges != nullptr);
+    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log)
+            << fmt::format("[verbose] SegmentPrefetcher need_prefetch enter 
current_rowid={}, {}",
+                           current_rowid, debug_string());
+    if (_block_sequence.empty() ||
+        _prefetched_index >= static_cast<int>(_block_sequence.size()) - 1) {
+        return false;
+    }
+
+    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
+            "[verbose] SegmentPrefetcher need_prefetch called with 
current_rowid={}, {}, "
+            "block=(id={}, first_rowid={})",
+            current_rowid, debug_string(), 
_block_sequence[_current_block_index].block_id,
+            _block_sequence[_current_block_index].first_rowid);
+    if (_is_forward) {
+        while (_current_block_index + 1 < _block_sequence.size() &&
+               _block_sequence[_current_block_index + 1].first_rowid <= 
current_rowid) {
+            _current_block_index++;
+        }
+    } else {
+        while (_current_block_index + 1 < _block_sequence.size() &&
+               _block_sequence[_current_block_index + 1].first_rowid >= 
current_rowid) {
+            _current_block_index++;
+        }
+    }
+
+    out_ranges->clear();
+    // for non-predicate column, some rowids in row_bitmap may be filtered out 
after vec evaluation of predicate columns,
+    // so we should not prefetch for these rows
+    _prefetched_index = std::max(_prefetched_index, _current_block_index - 1);
+    while (_prefetched_index + 1 < _block_sequence.size() &&
+           window_size() < _config.prefetch_window_size) {
+        
out_ranges->push_back(_block_id_to_range(_block_sequence[++_prefetched_index].block_id));
+    }
+
+    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
+            "[verbose] SegmentPrefetcher need_prefetch after calc with 
current_rowid={}, {}, "
+            "block=(id={}, first_rowid={})",
+            current_rowid, debug_string(), 
_block_sequence[_current_block_index].block_id,
+            _block_sequence[_current_block_index].first_rowid);
+
+    bool triggered = !out_ranges->empty();
+    if (triggered) {
+        LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << 
fmt::format(
+                "[verbose] SegmentPrefetcher prefetch triggered at rowid={}, 
{}, prefetch {} "
+                "blocks: (offset, size)=[{}]",
+                current_rowid, debug_string(), out_ranges->size(),
+                fmt::join(*out_ranges | std::views::transform([](const auto& 
b) {
+                    return fmt::format("({}, {})", b.offset, b.size);
+                }),
+                          ","));
+    }
+    return triggered;
+}
+
+} // namespace doris::segment_v2
diff --git a/be/src/olap/rowset/segment_v2/segment_prefetcher.h 
b/be/src/olap/rowset/segment_v2/segment_prefetcher.h
new file mode 100644
index 00000000000..fae65383820
--- /dev/null
+++ b/be/src/olap/rowset/segment_v2/segment_prefetcher.h
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstddef>
+#include <cstdint>
+#include <roaring/roaring.hh>
+#include <vector>
+
+#include "common/status.h"
+#include "olap/iterators.h"
+#include "olap/rowset/segment_v2/common.h"
+
+namespace doris {
+namespace io {
+class FileReader;
+} // namespace io
+
+namespace segment_v2 {
+class OrdinalIndexReader;
+class ColumnReader;
+
+// Configuration for segment prefetcher
+struct SegmentPrefetcherConfig {
+    // Number of file cache blocks to prefetch ahead
+    size_t prefetch_window_size = 4;
+
+    // File cache block size in bytes (default 1MB)
+    size_t block_size = 1024 * 1024;
+
+    SegmentPrefetcherConfig() = default;
+    SegmentPrefetcherConfig(size_t window_size, size_t blk_size)
+            : prefetch_window_size(window_size), block_size(blk_size) {}
+};
+
+// Block range representing [offset, offset + size) in the segment file
+struct BlockRange {
+    uint64_t offset;
+    uint64_t size;
+
+    BlockRange(uint64_t off, uint64_t sz) : offset(off), size(sz) {}
+
+    bool operator==(const BlockRange& other) const {
+        return offset == other.offset && size == other.size;
+    }
+};
+
+// Represents a block with its first rowid for reading
+struct BlockInfo {
+    size_t block_id;
+    rowid_t first_rowid;
+
+    BlockInfo(size_t bid, rowid_t rid) : block_id(bid), first_rowid(rid) {}
+};
+
+struct SegmentPrefetchParams {
+    SegmentPrefetcherConfig config;
+    const roaring::Roaring& row_bitmap;
+    const StorageReadOptions& read_options;
+};
+
+// SegmentPrefetcher maintains block sequence and triggers prefetch to keep
+// N blocks ahead of current reading position.
+//
+// Key design:
+// - Monotonic reading: rowids are read in order (forward or backward)
+// - Trigger condition: when current_rowid reaches a block boundary, prefetch 
next N blocks
+// - No deduplication needed: reading is monotonic, blocks are naturally 
processed in order
+//
+// Usage:
+//   SegmentPrefetcher prefetcher(config);
+//   prefetcher.init(row_bitmap, ordinal_index, is_reverse);
+//   // In each next_batch():
+//   std::vector<BlockRange> ranges;
+//   if (prefetcher.need_prefetch(current_first_rowid, &ranges)) {
+//       for (auto& range : ranges) {
+//           file_reader->prefetch_range(range.offset, range.size);
+//       }
+//   }
+//
+class SegmentPrefetcher {
+public:
+    explicit SegmentPrefetcher(const SegmentPrefetcherConfig& config) : 
_config(config) {}
+
+    ~SegmentPrefetcher() = default;
+
+    // Initialize prefetcher with the full row bitmap and ordinal index.
+    //
+    // Parameters:
+    //   row_bitmap: The complete bitmap of rowids to scan
+    //   column_reader: Column reader for accessing ordinal index
+    //   read_options: Storage read options
+    //
+    // Returns OK on success, error status on failure
+    Status init(const roaring::Roaring& row_bitmap, 
std::shared_ptr<ColumnReader> column_reader,
+                const StorageReadOptions& read_options);
+
+    // Check if prefetch is needed for current_rowid and return blocks to 
prefetch.
+    // This maintains N blocks ahead of the current reading position.
+    //
+    // Parameters:
+    //   current_rowid: The first rowid being read in current batch
+    //   out_ranges: Output vector of BlockRange to prefetch (only filled if 
return true)
+    //
+    // Returns true if prefetch is needed, false otherwise
+    bool need_prefetch(rowid_t current_rowid, std::vector<BlockRange>* 
out_ranges);
+
+    void add_rowids(const rowid_t* rowids, uint32_t num);
+    void finish_build_blocks();
+
+private:
+    // Parameters:
+    //   row_bitmap: The complete bitmap of rowids to scan
+    //   ordinal_index: Ordinal index reader (must be loaded)
+    //
+    // For forward reading: first_rowid is the first rowid we need to read in 
each block
+    // For backward reading: first_rowid is the last rowid we need to read in 
each block
+    //   (since we read backwards, this is the first one we'll encounter)
+    void _build_block_sequence_from_bitmap(const roaring::Roaring& row_bitmap,
+                                           OrdinalIndexReader* ordinal_index);
+    size_t _offset_to_block_id(uint64_t offset) const { return offset / 
_config.block_size; }
+
+    BlockRange _block_id_to_range(size_t block_id) const {
+        return {block_id * _config.block_size, _config.block_size};
+    }
+
+    int window_size() const { return _prefetched_index - _current_block_index 
+ 1; }
+
+    std::string debug_string() const {
+        return fmt::format(
+                "[internal state] _is_forward={}, _prefetched_index={}, 
_current_block_index={}, "
+                "window_size={}, block.size()={}, path={}",
+                _is_forward, _prefetched_index, _current_block_index, 
window_size(),
+                _block_sequence.size(), _path);
+    }
+
+private:
+    SegmentPrefetcherConfig _config;
+    std::string _path;
+
+    // Sequence of blocks with their first rowid (in reading order)
+    std::vector<BlockInfo> _block_sequence;
+
+    bool _is_forward = true;
+
+    int _prefetched_index = -1;
+    int _current_block_index = 0;
+
+    int page_idx = 0;
+    // For each page, track the first rowid we need to read
+    // For forward: the smallest rowid in this page
+    // For backward: the largest rowid in this page (first one we'll encounter 
when reading backwards)
+    size_t last_block_id = static_cast<size_t>(-1);
+    rowid_t current_block_first_rowid = 0;
+
+    OrdinalIndexReader* ordinal_index = nullptr;
+    std::vector<ordinal_t>* ordinals = nullptr;
+};
+
+} // namespace segment_v2
+} // namespace doris
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp 
b/be/src/pipeline/exec/olap_scan_operator.cpp
index 3264c1b5f6f..91d7e2c934b 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -238,6 +238,8 @@ Status OlapScanLocalState::_init_profile() {
             ADD_TIMER(_scanner_profile, 
"SegmentIteratorInitBitmapIndexIteratorsTimer");
     _segment_iterator_init_inverted_index_iterators_timer =
             ADD_TIMER(_scanner_profile, 
"SegmentIteratorInitInvertedIndexIteratorsTimer");
+    _segment_iterator_init_segment_prefetchers_timer =
+            ADD_TIMER(_scanner_profile, 
"SegmentIteratorInitSegmentPrefetchersTimer");
 
     _segment_create_column_readers_timer =
             ADD_TIMER(_scanner_profile, "SegmentCreateColumnReadersTimer");
diff --git a/be/src/pipeline/exec/olap_scan_operator.h 
b/be/src/pipeline/exec/olap_scan_operator.h
index 5c3d0f38b84..38b5b7ed304 100644
--- a/be/src/pipeline/exec/olap_scan_operator.h
+++ b/be/src/pipeline/exec/olap_scan_operator.h
@@ -242,6 +242,7 @@ private:
     RuntimeProfile::Counter* 
_segment_iterator_init_return_column_iterators_timer = nullptr;
     RuntimeProfile::Counter* 
_segment_iterator_init_bitmap_index_iterators_timer = nullptr;
     RuntimeProfile::Counter* 
_segment_iterator_init_inverted_index_iterators_timer = nullptr;
+    RuntimeProfile::Counter* _segment_iterator_init_segment_prefetchers_timer 
= nullptr;
 
     RuntimeProfile::Counter* _segment_create_column_readers_timer = nullptr;
     RuntimeProfile::Counter* _segment_load_index_timer = nullptr;
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 9f9ea7eb870..af7ba0652d3 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -243,6 +243,7 @@ public:
     ThreadPool* lazy_release_obj_pool() { return _lazy_release_obj_pool.get(); 
}
     ThreadPool* non_block_close_thread_pool();
     ThreadPool* s3_file_system_thread_pool() { return 
_s3_file_system_thread_pool.get(); }
+    ThreadPool* segment_prefetch_thread_pool() { return 
_segment_prefetch_thread_pool.get(); }
 
     Status init_pipeline_task_scheduler();
     void init_file_cache_factory(std::vector<doris::CachePath>& cache_paths);
@@ -452,6 +453,8 @@ private:
     std::unique_ptr<ThreadPool> _lazy_release_obj_pool;
     std::unique_ptr<ThreadPool> _non_block_close_thread_pool;
     std::unique_ptr<ThreadPool> _s3_file_system_thread_pool;
+    // Threadpool used to prefetch segment file cache blocks
+    std::unique_ptr<ThreadPool> _segment_prefetch_thread_pool;
 
     FragmentMgr* _fragment_mgr = nullptr;
     pipeline::TaskScheduler* _without_group_task_scheduler = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index d4d375e496a..c813716f7fb 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -245,6 +245,11 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
                               .set_max_threads(buffered_reader_max_threads)
                               .build(&_buffered_reader_prefetch_thread_pool));
 
+    static_cast<void>(ThreadPoolBuilder("SegmentPrefetchThreadPool")
+                              
.set_min_threads(config::segment_prefetch_thread_pool_thread_num_min)
+                              
.set_max_threads(config::segment_prefetch_thread_pool_thread_num_max)
+                              .build(&_segment_prefetch_thread_pool));
+
     static_cast<void>(ThreadPoolBuilder("SendTableStatsThreadPool")
                               .set_min_threads(8)
                               .set_max_threads(32)
@@ -769,6 +774,7 @@ void ExecEnv::destroy() {
         _runtime_query_statistics_mgr->stop_report_thread();
     }
     SAFE_SHUTDOWN(_buffered_reader_prefetch_thread_pool);
+    SAFE_SHUTDOWN(_segment_prefetch_thread_pool);
     SAFE_SHUTDOWN(_s3_file_upload_thread_pool);
     SAFE_SHUTDOWN(_lazy_release_obj_pool);
     SAFE_SHUTDOWN(_non_block_close_thread_pool);
@@ -823,6 +829,7 @@ void ExecEnv::destroy() {
     _s3_file_system_thread_pool.reset(nullptr);
     _send_table_stats_thread_pool.reset(nullptr);
     _buffered_reader_prefetch_thread_pool.reset(nullptr);
+    _segment_prefetch_thread_pool.reset(nullptr);
     _s3_file_upload_thread_pool.reset(nullptr);
     _send_batch_thread_pool.reset(nullptr);
     _write_cooldown_meta_executors.reset(nullptr);
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index 82dd7432674..74c0e27fcd8 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -48,6 +48,7 @@
 #include "cloud/cloud_backend_service.h"
 #include "cloud/config.h"
 #include "common/stack_trace.h"
+#include "util/concurrency_stats.h"
 #include "olap/tablet_schema_cache.h"
 #include "olap/utils.h"
 #include "runtime/memory/mem_tracker_limiter.h"
@@ -533,6 +534,9 @@ int main(int argc, char** argv) {
         return 0;
     }
 
+    // Start concurrency stats manager
+    doris::ConcurrencyStatsManager::instance().start();
+
     // begin to start services
     doris::ThriftRpcHelper::setup(exec_env);
     // 1. thrift server with be_port
diff --git a/be/src/util/concurrency_stats.cpp 
b/be/src/util/concurrency_stats.cpp
new file mode 100644
index 00000000000..b835bdb2825
--- /dev/null
+++ b/be/src/util/concurrency_stats.cpp
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/concurrency_stats.h"
+
+#include <chrono>
+#include <sstream>
+
+#include "common/config.h"
+#include "common/logging.h"
+
+namespace doris {
+ConcurrencyStatsManager::ConcurrencyStatsManager() : _running(false) {
+    // Initialize all counters in the order of read path (top to bottom)
+    vscanner_get_block = new ConcurrencyCounter("vscanner");
+    segment_iterator_next_batch = new ConcurrencyCounter("segment_iterator");
+    column_reader_read_page = new ConcurrencyCounter("column_reader");
+    page_io_decompress = new ConcurrencyCounter("page_io.decompress");
+    page_io_pre_decode = new ConcurrencyCounter("page_io.pre_decode");
+    page_io_insert_page_cache = new 
ConcurrencyCounter("page_io.insert_page_cache");
+    cached_remote_reader_read_at = new 
ConcurrencyCounter("file_cache.read_at");
+    cached_remote_reader_get_or_set = new 
ConcurrencyCounter("file_cache.get_or_set");
+    cached_remote_reader_get_or_set_wait_lock =
+            new ConcurrencyCounter("file_cache.get_or_set_wait_lock");
+    cached_remote_reader_get_or_set_downloader =
+            new ConcurrencyCounter("file_cache.get_or_set_downloader");
+    cached_remote_reader_write_back = new 
ConcurrencyCounter("file_cache.write_back");
+    cached_remote_reader_blocking = new 
ConcurrencyCounter("file_cache.blocking");
+    cached_remote_reader_local_read = new 
ConcurrencyCounter("file_cache.local_read");
+    s3_file_reader_read = new ConcurrencyCounter("s3.read");
+
+    // Add to vector in the order they should be printed
+    _counters.push_back(vscanner_get_block);
+    _counters.push_back(segment_iterator_next_batch);
+    _counters.push_back(column_reader_read_page);
+    _counters.push_back(page_io_decompress);
+    _counters.push_back(page_io_pre_decode);
+    _counters.push_back(page_io_insert_page_cache);
+    _counters.push_back(cached_remote_reader_read_at);
+    _counters.push_back(cached_remote_reader_get_or_set);
+    _counters.push_back(cached_remote_reader_get_or_set_wait_lock);
+    // _counters.push_back(cached_remote_reader_get_or_set_downloader);
+    _counters.push_back(cached_remote_reader_write_back);
+    _counters.push_back(cached_remote_reader_blocking);
+    _counters.push_back(cached_remote_reader_local_read);
+    _counters.push_back(s3_file_reader_read);
+}
+
+ConcurrencyStatsManager::~ConcurrencyStatsManager() {
+    stop();
+
+    // Clean up counters
+    for (auto* counter : _counters) {
+        delete counter;
+    }
+    _counters.clear();
+}
+
+ConcurrencyStatsManager& ConcurrencyStatsManager::instance() {
+    static ConcurrencyStatsManager instance;
+    return instance;
+}
+
+void ConcurrencyStatsManager::start() {
+    if (_running.exchange(true)) {
+        return; // Already running
+    }
+
+    _dump_thread = std::make_unique<std::thread>([this]() { 
_dump_thread_func(); });
+}
+
+void ConcurrencyStatsManager::stop() {
+    if (!_running.exchange(false)) {
+        return; // Not running
+    }
+
+    if (_dump_thread && _dump_thread->joinable()) {
+        _dump_thread->join();
+    }
+    _dump_thread.reset();
+}
+
+void ConcurrencyStatsManager::dump_to_log() {
+    if (_counters.empty()) {
+        return;
+    }
+
+    // Build single line output: CONCURRENCY_STATS name1=value1 name2=value2 
...
+    std::stringstream ss;
+    ss << "CONCURRENCY_STATS";
+
+    for (const auto* counter : _counters) {
+        int64_t value = counter->value();
+        ss << " " << counter->name() << "=" << value;
+    }
+
+    LOG(INFO) << ss.str();
+}
+
+void ConcurrencyStatsManager::_dump_thread_func() {
+    while (_running.load(std::memory_order_relaxed)) {
+        // Check if dumping is enabled
+        if (config::enable_concurrency_stats_dump) {
+            dump_to_log();
+        }
+
+        // Sleep for the configured interval
+        int32_t interval_ms = config::concurrency_stats_dump_interval_ms;
+        if (interval_ms <= 0) {
+            interval_ms = 100; // Default to 100ms if invalid
+        }
+
+        std::this_thread::sleep_for(std::chrono::milliseconds(interval_ms));
+    }
+}
+
+} // namespace doris
diff --git a/be/src/util/concurrency_stats.h b/be/src/util/concurrency_stats.h
new file mode 100644
index 00000000000..1615aa9c5a3
--- /dev/null
+++ b/be/src/util/concurrency_stats.h
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <atomic>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+namespace doris {
+
+// A thread-safe counter for tracking concurrent operations
+// Uses atomic variable for high-performance concurrent access
+class ConcurrencyCounter {
+public:
+    explicit ConcurrencyCounter(std::string name) : _name(std::move(name)), 
_count(0) {}
+
+    // Increment the counter
+    void increment() { _count.fetch_add(1, std::memory_order_relaxed); }
+
+    // Decrement the counter
+    void decrement() { _count.fetch_sub(1, std::memory_order_relaxed); }
+
+    // Get current value
+    int64_t value() const { return _count.load(std::memory_order_relaxed); }
+
+    const std::string& name() const { return _name; }
+
+    // RAII helper for automatic increment/decrement
+    class Guard {
+    public:
+        explicit Guard(ConcurrencyCounter* counter) : _counter(counter) {
+            if (_counter) {
+                _counter->increment();
+            }
+        }
+
+        ~Guard() {
+            if (_counter) {
+                _counter->decrement();
+            }
+        }
+
+        Guard(const Guard&) = delete;
+        Guard& operator=(const Guard&) = delete;
+
+    private:
+        ConcurrencyCounter* _counter;
+    };
+
+private:
+    std::string _name;
+    std::atomic<int64_t> _count;
+};
+
+// Singleton manager for all concurrency counters
+// All counters are defined here in order
+class ConcurrencyStatsManager {
+public:
+    static ConcurrencyStatsManager& instance();
+
+    // Start the background thread for periodic logging
+    void start();
+
+    // Stop the background thread
+    void stop();
+
+    // Manually dump all counters to log
+    void dump_to_log();
+
+    // Access to individual counters (defined in order of read path from top 
to bottom)
+    ConcurrencyCounter* vscanner_get_block;
+    ConcurrencyCounter* segment_iterator_next_batch;
+    ConcurrencyCounter* column_reader_read_page;
+    ConcurrencyCounter* page_io_decompress;
+    ConcurrencyCounter* page_io_pre_decode;
+    ConcurrencyCounter* page_io_insert_page_cache;
+    ConcurrencyCounter* cached_remote_reader_read_at;
+    ConcurrencyCounter* cached_remote_reader_get_or_set;
+    ConcurrencyCounter* cached_remote_reader_get_or_set_wait_lock;
+    ConcurrencyCounter* cached_remote_reader_get_or_set_downloader;
+    ConcurrencyCounter* cached_remote_reader_write_back;
+    ConcurrencyCounter* cached_remote_reader_blocking;
+    ConcurrencyCounter* cached_remote_reader_local_read;
+    ConcurrencyCounter* s3_file_reader_read;
+
+private:
+    ConcurrencyStatsManager();
+    ~ConcurrencyStatsManager();
+
+    ConcurrencyStatsManager(const ConcurrencyStatsManager&) = delete;
+    ConcurrencyStatsManager& operator=(const ConcurrencyStatsManager&) = 
delete;
+
+    void _dump_thread_func();
+
+    // All counters in the order they should be printed
+    std::vector<ConcurrencyCounter*> _counters;
+
+    std::atomic<bool> _running;
+    std::unique_ptr<std::thread> _dump_thread;
+};
+
+// Macro for scoped counting
+#define SCOPED_CONCURRENCY_COUNT(counter_ptr) \
+    doris::ConcurrencyCounter::Guard 
_concurrency_guard_##__LINE__(counter_ptr);
+
+} // namespace doris
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 707defa1690..095f5af1c2d 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -743,6 +743,8 @@ void NewOlapScanner::_collect_profile_before_close() {
                    
stats.segment_iterator_init_bitmap_index_iterators_timer_ns);
     
COUNTER_UPDATE(local_state->_segment_iterator_init_inverted_index_iterators_timer,
                    
stats.segment_iterator_init_inverted_index_iterators_timer_ns);
+    
COUNTER_UPDATE(local_state->_segment_iterator_init_segment_prefetchers_timer,
+                   stats.segment_iterator_init_segment_prefetchers_timer_ns);
 
     COUNTER_UPDATE(local_state->_segment_create_column_readers_timer,
                    stats.segment_create_column_readers_timer_ns);
diff --git a/be/src/vec/exec/scan/vscanner.cpp 
b/be/src/vec/exec/scan/vscanner.cpp
index 96a384177de..f4db3b98588 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -22,6 +22,7 @@
 #include "common/config.h"
 #include "pipeline/exec/scan_operator.h"
 #include "runtime/descriptors.h"
+#include "util/concurrency_stats.h"
 #include "util/defer_op.h"
 #include "util/runtime_profile.h"
 #include "vec/core/column_with_type_and_name.h"
@@ -75,6 +76,7 @@ Status VScanner::prepare(RuntimeState* state, const 
VExprContextSPtrs& conjuncts
 
 Status VScanner::get_block_after_projects(RuntimeState* state, 
vectorized::Block* block,
                                           bool* eos) {
+    
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().vscanner_get_block);
     auto& row_descriptor = _local_state->_parent->row_descriptor();
     if (_output_row_descriptor) {
         
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to