Copilot commented on code in PR #70:
URL: https://github.com/apache/paimon-cpp/pull/70#discussion_r3385189692


##########
src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp:
##########
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "paimon/common/reader/prefetch_file_batch_reader_impl.h"
+
+#include <algorithm>
+#include <chrono>
+#include <future>
+#include <thread>
+
+#include "arrow/array/array_base.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "paimon/common/executor/future.h"
+#include "paimon/common/io/cache_input_stream.h"
+#include "paimon/common/metrics/metrics_impl.h"
+#include "paimon/common/reader/reader_utils.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/scope_guard.h"
+#include "paimon/format/reader_builder.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/utils/read_ahead_cache.h"
+
+namespace arrow {
+class Schema;
+}  // namespace arrow
+
+namespace paimon {
+
+Result<std::unique_ptr<PrefetchFileBatchReaderImpl>> 
PrefetchFileBatchReaderImpl::Create(
+    const std::string& data_file_path, const ReaderBuilder* reader_builder,
+    const std::shared_ptr<FileSystem>& fs, uint32_t prefetch_max_parallel_num, 
int32_t batch_size,
+    uint32_t prefetch_batch_count, bool enable_adaptive_prefetch_strategy,
+    const std::shared_ptr<Executor>& executor, bool initialize_read_ranges,
+    PrefetchCacheMode prefetch_cache_mode, const CacheConfig& cache_config,
+    const std::shared_ptr<MemoryPool>& pool) {
+    if (prefetch_max_parallel_num == 0) {
+        return Status::Invalid("prefetch max parallel num should be greater 
than 0.");
+    }
+    if (prefetch_batch_count == 0) {
+        return Status::Invalid("prefetch batch count should be greater than 
0.");
+    }
+    if (batch_size <= 0) {
+        return Status::Invalid("batch size should be greater than 0.");
+    }
+    if (reader_builder == nullptr) {
+        return Status::Invalid("reader_builder should not be nullptr.");
+    }
+    if (fs == nullptr) {
+        return Status::Invalid("file system should not be nullptr.");
+    }
+    if (executor == nullptr) {
+        return Status::Invalid("executor should not be nullptr.");
+    }
+
+    std::shared_ptr<ReadAheadCache> cache;
+    if (prefetch_cache_mode != PrefetchCacheMode::NEVER) {
+        PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<InputStream> input_stream, 
fs->Open(data_file_path));
+        cache = std::make_shared<ReadAheadCache>(input_stream, cache_config, 
pool);
+    }
+    std::vector<std::future<Result<std::unique_ptr<FileBatchReader>>>> futures;
+    for (uint32_t i = 0; i < prefetch_max_parallel_num; i++) {
+        futures.push_back(Via(executor.get(),
+                              [&fs, &data_file_path, &reader_builder,
+                               &cache]() -> 
Result<std::unique_ptr<FileBatchReader>> {
+                                  
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<InputStream> input_stream,
+                                                         
fs->Open(data_file_path));
+                                  auto cache_input_stream = 
std::make_shared<CacheInputStream>(
+                                      std::move(input_stream), cache);
+                                  return 
reader_builder->Build(cache_input_stream);
+                              }));
+    }
+    std::vector<std::shared_ptr<PrefetchFileBatchReader>> readers;
+    for (auto& file_batch_reader : CollectAll(futures)) {
+        if (!file_batch_reader.ok()) {
+            return file_batch_reader.status();
+        }
+        std::shared_ptr<FileBatchReader> reader = 
std::move(file_batch_reader).value();
+        auto prefetch_file_batch_reader =
+            std::dynamic_pointer_cast<PrefetchFileBatchReader>(reader);
+        if (prefetch_file_batch_reader == nullptr) {
+            return Status::Invalid(
+                "failed to cast to prefetch file batch reader. file format not 
support prefetch");
+        }
+        readers.emplace_back(prefetch_file_batch_reader);
+    }
+    if (prefetch_batch_count < readers.size()) {
+        prefetch_batch_count = readers.size();
+    }
+    uint32_t prefetch_queue_capacity = prefetch_batch_count / readers.size();
+
+    auto reader = std::unique_ptr<PrefetchFileBatchReaderImpl>(new 
PrefetchFileBatchReaderImpl(
+        readers, batch_size, prefetch_queue_capacity, 
enable_adaptive_prefetch_strategy, executor,
+        cache, prefetch_cache_mode));
+    if (initialize_read_ranges) {
+        // normally initialize read ranges should be false, as set read schema 
will refresh read
+        // ranges, and set read schema will always be called before read.
+        PAIMON_RETURN_NOT_OK(reader->RefreshReadRanges());
+    }
+    return reader;
+}
+
+PrefetchFileBatchReaderImpl::PrefetchFileBatchReaderImpl(
+    const std::vector<std::shared_ptr<PrefetchFileBatchReader>>& readers, 
int32_t batch_size,
+    uint32_t prefetch_queue_capacity, bool enable_adaptive_prefetch_strategy,
+    const std::shared_ptr<Executor>& executor, const 
std::shared_ptr<ReadAheadCache>& cache,
+    PrefetchCacheMode cache_mode)
+    : readers_(std::move(readers)),
+      batch_size_(batch_size),
+      executor_(executor),
+      cache_(cache),
+      cache_mode_(cache_mode),
+      prefetch_queue_capacity_(prefetch_queue_capacity),
+      enable_adaptive_prefetch_strategy_(enable_adaptive_prefetch_strategy) {
+    for (size_t i = 0; i < readers_.size(); i++) {
+        
prefetch_queues_.emplace_back(std::make_unique<ThreadsafeQueue<PrefetchBatch>>());
+        readers_pos_.emplace_back(std::make_unique<std::atomic<uint64_t>>(0));
+        reader_is_working_.emplace_back(false);
+    }
+    parallel_num_ = readers_.size();
+}
+
+PrefetchFileBatchReaderImpl::~PrefetchFileBatchReaderImpl() {
+    (void)CleanUp();
+}
+
+Status PrefetchFileBatchReaderImpl::SetReadSchema(
+    ::ArrowSchema* read_schema, const std::shared_ptr<Predicate>& predicate,
+    const std::optional<RoaringBitmap32>& selection_bitmap) {
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Schema> schema,
+                                      arrow::ImportSchema(read_schema));
+    for (const auto& reader : readers_) {
+        auto c_schema = std::make_unique<::ArrowSchema>();
+        PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schema, 
c_schema.get()));
+        PAIMON_RETURN_NOT_OK(reader->SetReadSchema(c_schema.get(), predicate, 
selection_bitmap));
+    }
+    selection_bitmap_ = selection_bitmap;
+    predicate_ = predicate;
+    return RefreshReadRanges();
+}
+
+Status PrefetchFileBatchReaderImpl::RefreshReadRanges() {
+    PAIMON_RETURN_NOT_OK(CleanUp());
+    bool need_prefetch;
+    PAIMON_ASSIGN_OR_RAISE(auto read_ranges, 
readers_[0]->GenReadRanges(&need_prefetch));
+
+    if (!enable_adaptive_prefetch_strategy_) {
+        need_prefetch = true;
+    } else if (need_prefetch && enable_adaptive_prefetch_strategy_ && 
!read_ranges.empty()) {
+        uint64_t batch_count_in_range =
+            (read_ranges[0].second - read_ranges[0].first) / batch_size_;
+        if (batch_count_in_range > 
static_cast<uint64_t>(prefetch_queue_capacity_)) {
+            need_prefetch = false;
+        }
+    }
+
+    need_prefetch_ = need_prefetch;
+    PAIMON_RETURN_NOT_OK(SetReadRanges(FilterReadRanges(read_ranges, 
selection_bitmap_)));
+    read_ranges_freshed_ = true;
+
+    return Status::OK();
+}
+
+std::vector<std::pair<uint64_t, uint64_t>> 
PrefetchFileBatchReaderImpl::FilterReadRanges(
+    const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges,
+    const std::optional<RoaringBitmap32>& selection_bitmap) {
+    if (!selection_bitmap) {
+        return read_ranges;
+    }
+    std::vector<std::pair<uint64_t, uint64_t>> result;
+    for (const auto& read_range : read_ranges) {
+        if (selection_bitmap.value().ContainsAny(read_range.first, 
read_range.second)) {
+            result.push_back(read_range);
+        }
+    }
+    return result;
+}
+
+Status PrefetchFileBatchReaderImpl::SetReadRanges(
+    const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges) {
+    // push down read ranges for reducing IO amplification
+    read_ranges_in_group_ = DispatchReadRanges(read_ranges, readers_.size());
+    if (need_prefetch_ && readers_.size() > 1) {
+        // if prefetching isn't necessary, then setting read ranges won't be 
needed either.
+        std::vector<std::future<Status>> futures;
+        for (size_t i = 0; i < readers_.size(); i++) {
+            futures.push_back(Via(executor_.get(), [this, i]() -> Status {
+                return readers_[i]->SetReadRanges(read_ranges_in_group_[i]);
+            }));
+        }
+        for (const auto& status : CollectAll(futures)) {
+            if (!status.ok()) {
+                return status;
+            }
+        }
+    }
+    for (const auto& read_range : read_ranges) {
+        read_ranges_.push_back(read_range);
+    }
+    // Note: add a special read range out of file row count, for trigger an 
EOF access.
+    std::pair<uint64_t, uint64_t> eof_range;
+    PAIMON_ASSIGN_OR_RAISE(eof_range, EofRange());
+    read_ranges_.push_back(eof_range);
+    for (auto& read_ranges : read_ranges_in_group_) {
+        read_ranges.push_back(eof_range);
+    }
+    return Status::OK();
+}
+
+std::vector<std::vector<std::pair<uint64_t, uint64_t>>>
+PrefetchFileBatchReaderImpl::DispatchReadRanges(
+    const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges, size_t 
group_count) {
+    std::vector<std::vector<std::pair<uint64_t, uint64_t>>> 
read_ranges_in_group;
+    read_ranges_in_group.resize(group_count);
+    for (size_t i = 0; i < read_ranges.size(); i++) {
+        read_ranges_in_group[i % group_count].push_back(read_ranges[i]);
+    }
+    return read_ranges_in_group;
+}
+
+Status PrefetchFileBatchReaderImpl::CleanUp() {
+    auto clean_prefetch_queue = [this]() {
+        for (auto& prefetch_queue : prefetch_queues_) {
+            while (true) {
+                std::optional<PrefetchBatch> batch = prefetch_queue->try_pop();
+                {
+                    std::unique_lock<std::mutex> lock(working_mutex_);
+                    cv_.notify_one();
+                }
+                if (batch == std::nullopt) {
+                    break;
+                }
+                
ReaderUtils::ReleaseReadBatch(std::move(batch.value().batch.first));
+            }
+        }
+    };
+    // Clear the existing read ranges and prefetch queue
+    {
+        std::unique_lock<std::mutex> lock(working_mutex_);
+        is_shutdown_ = true;  // set is shutdown and check shutdown to avoid 
block at queue.push
+        cv_.notify_one();
+    }
+    // Join and reset the background thread if it exists
+    if (background_thread_) {
+        if (background_thread_->joinable()) {
+            background_thread_->join();
+            background_thread_.reset();
+        } else {
+            return Status::Invalid("background thread is not joinable");
+        }
+    }
+
+    read_ranges_.clear();
+    read_ranges_in_group_.clear();
+    clean_prefetch_queue();
+    for (size_t i = 0; i < readers_pos_.size(); i++) {
+        readers_pos_[i]->store(0);
+        reader_is_working_[i] = false;
+    }
+    is_shutdown_ = false;
+    if (cache_) {
+        cache_->Reset();
+    }
+    SetReadStatus(Status::OK());
+    return Status::OK();
+}
+
+bool PrefetchFileBatchReaderImpl::NeedInitCache() const {
+    switch (cache_mode_) {
+        case PrefetchCacheMode::NEVER:
+            return false;
+        case PrefetchCacheMode::EXCLUDE_PREDICATE:
+            return predicate_ == nullptr;
+        case PrefetchCacheMode::EXCLUDE_BITMAP:
+            return selection_bitmap_ == std::nullopt;
+        case PrefetchCacheMode::EXCLUDE_BITMAP_OR_PREDICATE:
+            return predicate_ == nullptr && selection_bitmap_ == std::nullopt;
+        case PrefetchCacheMode::ALWAYS:
+            return true;
+        default:
+            assert(false);
+            return true;
+    }
+}
+
+void PrefetchFileBatchReaderImpl::Workloop() {
+    std::vector<std::future<void>> futures;
+    futures.resize(readers_.size());
+    if (cache_ && NeedInitCache()) {
+        auto read_ranges = readers_[0]->PreBufferRange();
+        if (read_ranges.ok()) {
+            std::vector<ByteRange> ranges;
+            for (const auto& read_range : read_ranges.value()) {
+                ranges.emplace_back(read_range.first, read_range.second);
+            }
+            auto s = cache_->Init(std::move(ranges));
+            if (!s.ok()) {
+                SetReadStatus(s);
+            }
+        } else {
+            SetReadStatus(read_ranges.status());
+        }
+    }
+
+    while (true) {
+        if (!GetReadStatus().ok()) {
+            break;
+        }
+        if (is_shutdown_) {
+            break;
+        }
+        bool all_finished = true;
+        for (const auto& reader_pos : readers_pos_) {
+            if (reader_pos->load() != std::numeric_limits<uint64_t>::max()) {
+                all_finished = false;
+            }
+        }
+        if (all_finished) {
+            break;
+        }
+
+        bool made_progress_this_iteration = false;
+        for (size_t reader_idx = 0; reader_idx < readers_.size(); 
reader_idx++) {
+            if (!futures[reader_idx].valid() ||
+                (futures[reader_idx].wait_for(std::chrono::microseconds(0)) ==
+                 std::future_status::ready)) {
+                if (futures[reader_idx].valid()) {
+                    futures[reader_idx].get();
+                }
+                if (prefetch_queues_[reader_idx]->size() >= 
prefetch_queue_capacity_) {
+                    // queue is full, skip
+                    continue;
+                }
+                if (readers_pos_[reader_idx]->load() != 
std::numeric_limits<uint64_t>::max()) {
+                    futures[reader_idx] =
+                        Via(executor_.get(), [this, reader_idx]() { 
ReadBatch(reader_idx); });
+                    made_progress_this_iteration = true;
+                }
+            }
+        }
+        if (!made_progress_this_iteration) {
+            std::unique_lock<std::mutex> lock(working_mutex_);
+            cv_.wait(lock, [this] {
+                if (is_shutdown_) {
+                    return true;
+                }
+                for (size_t i = 0; i < reader_is_working_.size(); i++) {
+                    if (reader_is_working_[i]) {
+                        continue;
+                    }
+                    if (prefetch_queues_[i]->size() >= 
prefetch_queue_capacity_) {
+                        continue;
+                    }
+                    if (readers_pos_[i]->load() == 
std::numeric_limits<uint64_t>::max()) {
+                        continue;
+                    }
+                    return true;
+                }
+                return false;
+            });
+        }
+    }
+    Wait(futures);
+}
+
+void PrefetchFileBatchReaderImpl::ReadBatch(size_t reader_idx) {
+    Status status = DoReadBatch(reader_idx);
+    if (!status.ok()) {
+        SetReadStatus(status);
+    }
+}
+
+std::optional<std::pair<uint64_t, uint64_t>> 
PrefetchFileBatchReaderImpl::GetCurrentReadRange(
+    size_t reader_idx) const {
+    const auto& read_ranges = read_ranges_in_group_[reader_idx];
+    const auto& current_pos = readers_pos_[reader_idx];
+    uint64_t current_pos_value = current_pos->load();
+
+    for (const auto& range : read_ranges) {
+        if (current_pos_value < range.second) {
+            return range;
+        }
+    }
+    return std::nullopt;
+}
+
+Status PrefetchFileBatchReaderImpl::EnsureReaderPosition(
+    size_t reader_idx, const std::pair<uint64_t, uint64_t>& 
current_read_range) const {
+    uint64_t pos = std::max(readers_pos_[reader_idx]->load(), 
current_read_range.first);
+    if (readers_[reader_idx]->GetNextRowToRead() != pos) {
+        return readers_[reader_idx]->SeekToRow(pos);
+    }
+    return Status::OK();
+}
+
+Status PrefetchFileBatchReaderImpl::HandleReadResult(
+    size_t reader_idx, const std::pair<uint64_t, uint64_t>& read_range,
+    ReadBatchWithBitmap&& read_batch_with_bitmap) {
+    PAIMON_ASSIGN_OR_RAISE(uint64_t first_row_number,
+                           
readers_[reader_idx]->GetPreviousBatchFirstRowNumber());
+    auto& prefetch_queue = prefetch_queues_[reader_idx];
+    if (!BatchReader::IsEofBatch(read_batch_with_bitmap)) {
+        auto& [read_batch, bitmap] = read_batch_with_bitmap;
+        auto& [c_array, c_schema] = read_batch;
+
+        if (first_row_number >= read_range.second) {
+            // fully out of range, data before first_row_number has been 
filtered out
+            readers_pos_[reader_idx]->store(first_row_number);
+            ReaderUtils::ReleaseReadBatch(std::move(read_batch));
+            return Status::OK();
+        } else if (first_row_number + c_array->length > read_range.second) {
+            // partially out of range, data before read_range.second has been 
effectively consumed
+            readers_pos_[reader_idx]->store(read_range.second);
+            PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> 
src_array,
+                                              
arrow::ImportArray(c_array.get(), c_schema.get()));
+            int32_t target_length = read_range.second - first_row_number;
+            auto array = src_array->Slice(/*offset=*/0, target_length);
+            PAIMON_RETURN_NOT_OK_FROM_ARROW(
+                arrow::ExportArray(*array, c_array.get(), c_schema.get()));
+            bitmap.RemoveRange(target_length, src_array->length());
+        } else {
+            // all within the range, data before 
readers_[reader_idx]->GetNextRowToRead() has been
+            // effectively consumed
+            
readers_pos_[reader_idx]->store(readers_[reader_idx]->GetNextRowToRead());
+        }
+        if (bitmap.IsEmpty()) {
+            ReaderUtils::ReleaseReadBatch(std::move(read_batch));
+            return Status::OK();
+        }
+        prefetch_queue->push({read_range, std::move(read_batch_with_bitmap), 
first_row_number});
+    } else {
+        std::pair<uint64_t, uint64_t> eof_range;
+        PAIMON_ASSIGN_OR_RAISE(eof_range, EofRange());
+        prefetch_queue->push({eof_range, std::move(read_batch_with_bitmap), 
first_row_number});
+        readers_pos_[reader_idx]->store(std::numeric_limits<uint64_t>::max());
+    }
+    return Status::OK();
+}
+
+Status PrefetchFileBatchReaderImpl::DoReadBatch(size_t reader_idx) {
+    PAIMON_RETURN_NOT_OK(GetReadStatus());
+    if (is_shutdown_) {
+        return Status::OK();
+    }
+    std::optional<std::pair<uint64_t, uint64_t>> current_read_range =
+        GetCurrentReadRange(reader_idx);
+    if (current_read_range == std::nullopt) {
+        // No more read ranges for this reader, gracefully exit.
+        return Status::OK();
+    }
+    ScopeGuard guard([&]() {
+        std::unique_lock<std::mutex> lock(working_mutex_);
+        reader_is_working_[reader_idx] = false;
+        cv_.notify_one();
+    });
+    {
+        std::unique_lock<std::mutex> lock(working_mutex_);
+        reader_is_working_[reader_idx] = true;
+    }
+
+    const auto& read_range = current_read_range.value();
+    FileBatchReader* reader = readers_[reader_idx].get();
+    PAIMON_RETURN_NOT_OK(EnsureReaderPosition(reader_idx, read_range));
+
+    PAIMON_ASSIGN_OR_RAISE(ReadBatchWithBitmap read_batch_with_bitmap,
+                           reader->NextBatchWithBitmap());
+
+    return HandleReadResult(reader_idx, read_range, 
std::move(read_batch_with_bitmap));
+}
+
+Result<BatchReader::ReadBatchWithBitmap> 
PrefetchFileBatchReaderImpl::NextBatchWithBitmap() {
+    if (!read_ranges_freshed_) {
+        return Status::Invalid("prefetch reader read ranges are not 
initialized");
+    }
+    if (!background_thread_) {
+        background_thread_ =
+            
std::make_unique<std::thread>(&PrefetchFileBatchReaderImpl::Workloop, this);
+    }
+
+    while (true) {
+        PAIMON_RETURN_NOT_OK(GetReadStatus());
+        if (is_shutdown_) {
+            return Status::Invalid(
+                "prefetch reader has inconsistent state, maybe read while 
closing reader or change "
+                "read schema");
+        }

Review Comment:
   `read_ranges_freshed_` is a plain `bool` read here and written in 
`RefreshReadRanges()` without synchronization, which is a data race. 
Additionally, `CleanUp()` clears shared structures (`read_ranges_`, 
`read_ranges_in_group_`, queues, etc.) while `NextBatchWithBitmap()` 
concurrently reads them; checking `is_shutdown_` alone doesn’t prevent races 
because `CleanUp()` can set `is_shutdown_` and clear containers between checks. 
Fix (mandatory) by introducing a dedicated mutex (or reusing an existing one) 
to protect all accesses/mutations of `read_ranges_*` and queue/front/pop logic, 
and make `read_ranges_freshed_` atomic or guard it under the same lock.



##########
src/paimon/common/reader/prefetch_file_batch_reader_impl.h:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <atomic>
+#include <cassert>
+#include <condition_variable>
+#include <cstddef>
+#include <cstdint>
+#include <deque>
+#include <limits>
+#include <memory>
+#include <mutex>
+#include <optional>
+#include <shared_mutex>
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include "arrow/c/abi.h"
+#include "paimon/common/utils/threadsafe_queue.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/reader/prefetch_file_batch_reader.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/utils/read_ahead_cache.h"
+#include "paimon/utils/roaring_bitmap32.h"
+
+struct ArrowSchema;
+
+namespace paimon {
+
+class ReaderBuilder;
+class FileSystem;
+class Executor;
+class Predicate;
+class Metrics;
+
+class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader {
+ public:
+    static Result<std::unique_ptr<PrefetchFileBatchReaderImpl>> Create(
+        const std::string& data_file_path, const ReaderBuilder* reader_builder,
+        const std::shared_ptr<FileSystem>& fs, uint32_t 
prefetch_max_parallel_num,
+        int32_t batch_size, uint32_t prefetch_batch_count, bool 
enable_adaptive_prefetch_strategy,
+        const std::shared_ptr<Executor>& executor, bool initialize_read_ranges,
+        PrefetchCacheMode prefetch_cache_mode, const CacheConfig& cache_config,
+        const std::shared_ptr<MemoryPool>& pool);
+
+    ~PrefetchFileBatchReaderImpl() override;
+
+    Result<FileBatchReader::ReadBatch> NextBatch() override {
+        return Status::Invalid(
+            "paimon inner reader PrefetchFileBatchReader should use 
NextBatchWithBitmap");
+    }
+    Result<FileBatchReader::ReadBatchWithBitmap> NextBatchWithBitmap() 
override;
+
+    std::shared_ptr<Metrics> GetReaderMetrics() const override;
+
+    Result<std::unique_ptr<::ArrowSchema>> GetFileSchema() const override;
+    Status SetReadSchema(::ArrowSchema* read_schema, const 
std::shared_ptr<Predicate>& predicate,
+                         const std::optional<RoaringBitmap32>& 
selection_bitmap) override;
+
+    Status SeekToRow(uint64_t row_number) override;
+    Result<uint64_t> GetPreviousBatchFirstRowNumber() const override;
+    Result<uint64_t> GetNumberOfRows() const override;
+    uint64_t GetNextRowToRead() const override;
+    void Close() override;
+    Status SetReadRanges(const std::vector<std::pair<uint64_t, uint64_t>>& 
read_ranges) override;
+
+    Result<std::vector<std::pair<uint64_t, uint64_t>>> GenReadRanges(
+        bool* need_prefetch) const override {
+        assert(false);
+        return Status::NotImplemented("gen read ranges not implemented");
+    }
+    bool SupportPreciseBitmapSelection() const override {
+        return readers_[0]->SupportPreciseBitmapSelection();
+    }
+
+    Status RefreshReadRanges();
+
+    inline PrefetchFileBatchReader* GetFirstReader() const {
+        return readers_[0].get();
+    }
+
+    inline bool NeedPrefetch() const {
+        return need_prefetch_;
+    }
+
+ private:
+    struct PrefetchBatch {
+        std::pair<uint64_t, uint64_t> read_range;
+        BatchReader::ReadBatchWithBitmap batch;
+        uint64_t previous_batch_first_row_num;
+    };
+
+    PrefetchFileBatchReaderImpl(
+        const std::vector<std::shared_ptr<PrefetchFileBatchReader>>& readers, 
int32_t batch_size,
+        uint32_t prefetch_queue_capacity, bool 
enable_adaptive_prefetch_strategy,
+        const std::shared_ptr<Executor>& executor, const 
std::shared_ptr<ReadAheadCache>& cache,
+        PrefetchCacheMode cache_mode);
+
+    Status CleanUp();
+    void Workloop();
+    void SetReadStatus(const Status& status);
+    Status GetReadStatus() const;
+    Result<bool> IsEofRange(const std::pair<uint64_t, uint64_t>& read_range) 
const;
+    Status DoReadBatch(size_t reader_idx);
+    void ReadBatch(size_t reader_idx);
+    size_t GetEnabledReaderSize() const;
+    static std::vector<std::pair<uint64_t, uint64_t>> FilterReadRanges(
+        const std::vector<std::pair<uint64_t, uint64_t>>& read_range,
+        const std::optional<RoaringBitmap32>& selection_bitmap);
+
+    static std::vector<std::vector<std::pair<uint64_t, uint64_t>>> 
DispatchReadRanges(
+        const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges, size_t 
reader_count);
+
+    Result<std::pair<uint64_t, uint64_t>> EofRange() const;
+    std::optional<std::pair<uint64_t, uint64_t>> GetCurrentReadRange(size_t 
reader_idx) const;
+    Status EnsureReaderPosition(size_t reader_idx,
+                                const std::pair<uint64_t, uint64_t>& 
read_range) const;
+    Status HandleReadResult(size_t reader_idx, const std::pair<uint64_t, 
uint64_t>& read_range,
+                            FileBatchReader::ReadBatchWithBitmap&& 
read_batch_with_bitmap);
+    bool NeedInitCache() const;
+
+ private:
+    std::vector<std::shared_ptr<PrefetchFileBatchReader>> readers_;
+    // The meaning of readers_pos_ is: all data before this pos has been 
filtered out or effectively
+    // consumed, and the data after this pos may need to be read in the next 
round of reading.
+    std::vector<std::unique_ptr<std::atomic<uint64_t>>> readers_pos_;
+    std::vector<std::unique_ptr<std::atomic<uint64_t>>> seek_cnt_;
+    const int32_t batch_size_;
+    std::optional<RoaringBitmap32> selection_bitmap_;
+    std::shared_ptr<Predicate> predicate_;
+    std::deque<std::pair<uint64_t, uint64_t>> read_ranges_;
+    std::vector<std::vector<std::pair<uint64_t, uint64_t>>> 
read_ranges_in_group_;
+    std::vector<std::unique_ptr<ThreadsafeQueue<PrefetchBatch>>> 
prefetch_queues_;
+    std::vector<bool> reader_is_working_;
+    std::mutex working_mutex_;
+    std::condition_variable cv_;
+    std::shared_ptr<Executor> executor_;
+    std::shared_ptr<ReadAheadCache> cache_;
+    PrefetchCacheMode cache_mode_;
+
+    mutable std::shared_mutex rw_mutex_;
+    std::unique_ptr<std::thread> background_thread_;
+    Status read_status_;
+    std::atomic<bool> is_shutdown_ = false;
+    uint64_t previous_batch_first_row_num_ = 
std::numeric_limits<uint64_t>::max();
+    bool need_prefetch_ = false;
+    bool read_ranges_freshed_ = false;

Review Comment:
   Typo in the member name: `read_ranges_freshed_` should be 
`read_ranges_refreshed_` (or similar). This improves clarity and avoids 
propagating a misspelling throughout the codebase.



##########
src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp:
##########
@@ -0,0 +1,914 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "paimon/common/reader/prefetch_file_batch_reader_impl.h"
+
+#include <set>
+
+#include "arrow/compute/api.h"
+#include "arrow/ipc/api.h"
+#include "gtest/gtest.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/executor.h"
+#include "paimon/format/file_format.h"
+#include "paimon/format/file_format_factory.h"
+#include "paimon/format/format_writer.h"
+#include "paimon/fs/file_system_factory.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/predicate/predicate_builder.h"
+#include "paimon/testing/mock/mock_file_batch_reader.h"
+#include "paimon/testing/mock/mock_file_system.h"
+#include "paimon/testing/mock/mock_format_reader_builder.h"
+#include "paimon/testing/utils/read_result_collector.h"
+#include "paimon/testing/utils/testharness.h"
+#include "paimon/utils/read_ahead_cache.h"
+
+namespace paimon::test {
+
+class ControlledMockFileBatchReader : public MockFileBatchReader {
+ public:
+    ControlledMockFileBatchReader(const std::shared_ptr<arrow::Array>& data,
+                                  const std::shared_ptr<arrow::DataType>& 
file_schema,
+                                  int32_t read_batch_size,
+                                  std::vector<std::pair<uint64_t, uint64_t>> 
read_ranges,
+                                  bool need_prefetch, Status 
set_read_ranges_status = Status::OK())
+        : MockFileBatchReader(data, file_schema, read_batch_size),
+          read_ranges_(std::move(read_ranges)),
+          need_prefetch_(need_prefetch),
+          set_read_ranges_status_(std::move(set_read_ranges_status)) {}
+
+    Result<std::vector<std::pair<uint64_t, uint64_t>>> GenReadRanges(
+        bool* need_prefetch) const override {
+        *need_prefetch = need_prefetch_;
+        return read_ranges_;
+    }
+
+    Status SetReadRanges(const std::vector<std::pair<uint64_t, uint64_t>>& 
read_ranges) override {
+        if (!set_read_ranges_status_.ok()) {
+            return set_read_ranges_status_;
+        }
+        return MockFileBatchReader::SetReadRanges(read_ranges);
+    }
+
+ private:
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges_;
+    bool need_prefetch_ = true;
+    Status set_read_ranges_status_;
+};
+
+class ControlledMockFormatReaderBuilder : public ReaderBuilder {
+ public:
+    ControlledMockFormatReaderBuilder(const std::shared_ptr<arrow::Array>& 
data,
+                                      const std::shared_ptr<arrow::DataType>& 
schema,
+                                      int32_t read_batch_size,
+                                      std::vector<std::pair<uint64_t, 
uint64_t>> read_ranges,
+                                      bool need_prefetch,
+                                      std::vector<Status> 
set_read_ranges_statuses)
+        : data_(data),
+          schema_(schema),
+          read_batch_size_(read_batch_size),
+          read_ranges_(std::move(read_ranges)),
+          need_prefetch_(need_prefetch),
+          set_read_ranges_statuses_(std::move(set_read_ranges_statuses)) {}
+
+    ReaderBuilder* WithMemoryPool(const std::shared_ptr<MemoryPool>& pool) 
override {
+        return this;
+    }
+
+    Result<std::unique_ptr<FileBatchReader>> Build(
+        const std::shared_ptr<InputStream>& path) const override {
+        size_t index = build_count_++;
+        Status set_read_ranges_status = index < 
set_read_ranges_statuses_.size()
+                                            ? set_read_ranges_statuses_[index]
+                                            : Status::OK();
+        return std::make_unique<ControlledMockFileBatchReader>(
+            data_, schema_, read_batch_size_, read_ranges_, need_prefetch_, 
set_read_ranges_status);
+    }
+
+    Result<std::unique_ptr<FileBatchReader>> Build(const std::string& path) 
const override {
+        return Status::Invalid("do not support build reader with path in mock 
format");
+    }
+
+ private:
+    std::shared_ptr<arrow::Array> data_;
+    std::shared_ptr<arrow::DataType> schema_;
+    int32_t read_batch_size_ = 0;
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges_;
+    bool need_prefetch_ = true;
+    std::vector<Status> set_read_ranges_statuses_;
+    mutable size_t build_count_ = 0;
+};
+
+struct TestParam {
+    std::string file_format;
+    PrefetchCacheMode cache_mode;
+};
+
+class PrefetchFileBatchReaderImplTest : public ::testing::Test,
+                                        public 
::testing::WithParamInterface<TestParam> {
+ public:
+    void SetUp() override {
+        fields_ = {arrow::field("f0", arrow::utf8()), arrow::field("f1", 
arrow::int64()),
+                   arrow::field("f2", arrow::boolean())};
+        data_type_ = arrow::struct_(fields_);
+        mock_fs_ = std::make_shared<MockFileSystem>();
+        local_fs_ = std::make_shared<LocalFileSystem>();
+        executor_ = CreateDefaultExecutor(/*thread_count=*/2);
+        dir_ = ::paimon::test::UniqueTestDirectory::Create();
+        ASSERT_TRUE(dir_);
+    }
+    void TearDown() override {}
+
+    std::shared_ptr<arrow::Array> PrepareArray(int32_t length, int32_t offset 
= 0) {
+        arrow::StructBuilder struct_builder(
+            data_type_, arrow::default_memory_pool(),
+            {std::make_shared<arrow::StringBuilder>(), 
std::make_shared<arrow::Int64Builder>(),
+             std::make_shared<arrow::BooleanBuilder>()});
+        auto string_builder = 
static_cast<arrow::StringBuilder*>(struct_builder.field_builder(0));
+        auto big_int_builder = 
static_cast<arrow::Int64Builder*>(struct_builder.field_builder(1));
+        auto bool_builder = 
static_cast<arrow::BooleanBuilder*>(struct_builder.field_builder(2));
+        for (int32_t i = 0 + offset; i < length + offset; ++i) {
+            EXPECT_TRUE(struct_builder.Append().ok());
+            EXPECT_TRUE(string_builder->Append("str_" + 
std::to_string(i)).ok());
+            EXPECT_TRUE(big_int_builder->Append(i).ok());
+            EXPECT_TRUE(bool_builder->Append(static_cast<bool>(i % 2)).ok());
+        }
+        std::shared_ptr<arrow::Array> array;
+        EXPECT_TRUE(struct_builder.Finish(&array).ok());
+        return array;
+    }
+
+    void PrepareTestData(const std::string& file_format_str,
+                         const std::shared_ptr<arrow::Array>& array, int32_t 
stripe_row_count,
+                         int32_t row_index_stride) const {
+        // for simple case, assume that array.length() %  row_index_stride == 0
+        ASSERT_EQ(array->length() % row_index_stride, 0);
+        arrow::Schema schema(array->type()->fields());
+        ::ArrowSchema c_schema;
+        ASSERT_TRUE(arrow::ExportSchema(schema, &c_schema).ok());
+        ASSERT_OK_AND_ASSIGN(
+            std::unique_ptr<FileFormat> file_format,
+            FileFormatFactory::Get(
+                file_format_str,
+                {{"parquet.write.max-row-group-length", 
std::to_string(row_index_stride)},
+                 {"orc.row.index.stride", std::to_string(row_index_stride)}}));
+
+        ASSERT_OK_AND_ASSIGN(auto writer_builder,
+                             file_format->CreateWriterBuilder(&c_schema, 
1024));
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<OutputStream> out,
+            local_fs_->Create(PathUtil::JoinPath(dir_->Str(), "file." + 
file_format_str),
+                              /*overwrite=*/false));
+        ASSERT_OK_AND_ASSIGN(auto writer, writer_builder->Build(out, "zstd"));
+
+        int32_t write_batch_count = array->length() / row_index_stride;
+        for (int32_t i = 0; i < write_batch_count; i++) {
+            auto slice = array->Slice(i * row_index_stride, row_index_stride);
+            auto copied_array = arrow::Concatenate({slice}).ValueOr(nullptr);
+            ASSERT_TRUE(copied_array);
+            ::ArrowArray c_array;
+            ASSERT_TRUE(arrow::ExportArray(*copied_array, &c_array).ok());
+            ASSERT_OK(writer->AddBatch(&c_array));
+        }
+        ASSERT_OK(writer->Flush());
+        ASSERT_OK(writer->Finish());
+        ASSERT_OK(out->Flush());
+        ASSERT_OK(out->Close());
+    }
+
+    std::unique_ptr<PrefetchFileBatchReaderImpl> PreparePrefetchReader(
+        const std::string& file_format_str, const arrow::Schema* read_schema,
+        const std::shared_ptr<Predicate>& predicate,
+        const std::optional<RoaringBitmap32>& selection_bitmap, int32_t 
batch_size,
+        int32_t prefetch_max_parallel_num, PrefetchCacheMode cache_mode) const 
{
+        EXPECT_OK_AND_ASSIGN(std::unique_ptr<FileFormat> file_format,
+                             FileFormatFactory::Get(file_format_str, {}));
+        EXPECT_OK_AND_ASSIGN(auto reader_builder, 
file_format->CreateReaderBuilder(batch_size));
+        EXPECT_OK_AND_ASSIGN(
+            std::unique_ptr<PrefetchFileBatchReaderImpl> reader,
+            PrefetchFileBatchReaderImpl::Create(
+                PathUtil::JoinPath(dir_->Str(), "file." + 
file_format->Identifier()),
+                reader_builder.get(), local_fs_, prefetch_max_parallel_num, 
batch_size,
+                prefetch_max_parallel_num * 2, 
/*enable_adaptive_prefetch_strategy=*/false,
+                CreateDefaultExecutor(prefetch_max_parallel_num - 1),
+                /*initialize_read_ranges=*/false, cache_mode, CacheConfig(), 
GetDefaultPool()));
+        std::unique_ptr<ArrowSchema> c_schema = 
std::make_unique<ArrowSchema>();
+        auto arrow_status = arrow::ExportSchema(*read_schema, c_schema.get());
+        EXPECT_TRUE(arrow_status.ok());
+        EXPECT_OK(reader->SetReadSchema(c_schema.get(), predicate, 
selection_bitmap));
+        return reader;
+    }
+
+    bool HasValue(const std::vector<
+                  
std::unique_ptr<ThreadsafeQueue<PrefetchFileBatchReaderImpl::PrefetchBatch>>>&
+                      prefetch_queues) {
+        for (const auto& queue : prefetch_queues) {
+            if (!queue->empty()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    bool CheckEqual(const std::shared_ptr<arrow::ChunkedArray>& lhs,
+                    const std::shared_ptr<arrow::ChunkedArray>& rhs) {
+        std::string lhs_str, rhs_str;
+        auto print_option = arrow::PrettyPrintOptions::Defaults();
+        print_option.window = 1000;
+        print_option.container_window = 1000;
+        EXPECT_TRUE(arrow::PrettyPrint(*lhs, print_option, &lhs_str).ok());
+        EXPECT_TRUE(arrow::PrettyPrint(*rhs, print_option, &rhs_str).ok());
+        bool is_equal = lhs->Equals(rhs);
+        if (!is_equal) {
+            std::cout << "lhs array: " << lhs_str << ", rhs array: " << 
rhs_str;
+        }
+        return is_equal;
+    }
+
+ private:
+    arrow::FieldVector fields_;
+    std::shared_ptr<arrow::DataType> data_type_;
+    std::shared_ptr<FileSystem> mock_fs_;
+    std::shared_ptr<FileSystem> local_fs_;
+    std::unique_ptr<paimon::test::UniqueTestDirectory> dir_;
+    std::shared_ptr<Executor> executor_;
+};
+
+std::vector<TestParam> PrepareTestParam() {
+    std::vector<TestParam> values = {
+        TestParam{"parquet", PrefetchCacheMode::ALWAYS},
+        TestParam{"parquet", PrefetchCacheMode::EXCLUDE_BITMAP},
+        TestParam{"parquet", PrefetchCacheMode::EXCLUDE_PREDICATE},
+        TestParam{"parquet", PrefetchCacheMode::EXCLUDE_BITMAP_OR_PREDICATE},
+        TestParam{"parquet", PrefetchCacheMode::NEVER}};
+#ifdef PAIMON_ENABLE_ORC
+    values.emplace_back(TestParam{"orc", PrefetchCacheMode::ALWAYS});
+    values.emplace_back(TestParam{"orc", PrefetchCacheMode::EXCLUDE_BITMAP});
+    values.emplace_back(TestParam{"orc", 
PrefetchCacheMode::EXCLUDE_PREDICATE});
+    values.emplace_back(TestParam{"orc", 
PrefetchCacheMode::EXCLUDE_BITMAP_OR_PREDICATE});
+    values.emplace_back(TestParam{"orc", PrefetchCacheMode::NEVER});
+#endif
+    return values;
+}
+
+INSTANTIATE_TEST_SUITE_P(TestParam, PrefetchFileBatchReaderImplTest,
+                         ::testing::ValuesIn(PrepareTestParam()));
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestSimple) {
+    auto data_array = PrepareArray(101);
+    int32_t batch_size = 10;
+    for (auto prefetch_max_parallel_num : {1, 2, 3, 5, 8, 10}) {
+        MockFormatReaderBuilder reader_builder(data_array, data_type_, 
batch_size);
+        ASSERT_OK_AND_ASSIGN(
+            auto reader,
+            PrefetchFileBatchReaderImpl::Create(
+                /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num,
+                batch_size, prefetch_max_parallel_num * 2,
+                /*enable_adaptive_prefetch_strategy=*/false, executor_,
+                /*initialize_read_ranges=*/true, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+                CacheConfig(), GetDefaultPool()));
+        ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
+        ASSERT_OK_AND_ASSIGN(auto result_array,
+                             ReadResultCollector::CollectResult(
+                                 reader.get(), /*max simulated data processing 
time*/ 100));
+        ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 101);
+        auto expected_array = 
std::make_shared<arrow::ChunkedArray>(data_array);
+        ASSERT_TRUE(result_array->Equals(expected_array));
+    }
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestReadWithLimits) {
+    auto data_array = PrepareArray(101);
+    int32_t batch_size = 10;
+    int32_t prefetch_max_parallel_num = 12;
+
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num * 2, 
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/true, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+    // simulate read limits, only read 8 batches
+    for (int32_t i = 0; i < 8; i++) {
+        ASSERT_OK_AND_ASSIGN(BatchReader::ReadBatchWithBitmap 
batch_with_bitmap,
+                             reader->NextBatchWithBitmap());
+        auto& [batch, bitmap] = batch_with_bitmap;
+        ASSERT_EQ(batch.first->length, bitmap.Cardinality());
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Array> array,
+                             ReadResultCollector::GetArray(std::move(batch)));
+        ASSERT_TRUE(array);
+    }
+    reader->Close();
+    // test metrics
+    auto read_metrics = reader->GetReaderMetrics();
+    ASSERT_TRUE(read_metrics);
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestReadWithoutInitializeReadRanges) {
+    auto data_array = PrepareArray(101);
+    int32_t batch_size = 10;
+    int32_t prefetch_max_parallel_num = 12;
+
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num * 2, 
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/false, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+    // simulate read limits, only read 8 batches
+    ASSERT_NOK_WITH_MSG(reader->NextBatchWithBitmap(),
+                        "prefetch reader read ranges are not initialized");
+    reader->Close();
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, FilterReadRangesWithoutBitmap) {
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges = {
+        {0, 1000},    {1000, 2000}, {2000, 3000}, {3000, 4000}, {4000, 5000},
+        {5000, 6000}, {6000, 7000}, {7000, 8000}, {8000, 9000}, {9000, 10000}};
+    auto filtered_ranges = 
PrefetchFileBatchReaderImpl::FilterReadRanges(read_ranges, std::nullopt);
+    ASSERT_EQ(filtered_ranges, read_ranges);
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, FilterReadRangesWithAllZeroBitmap) {
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges = {
+        {0, 1000},    {1000, 2000}, {2000, 3000}, {3000, 4000}, {4000, 5000},
+        {5000, 6000}, {6000, 7000}, {7000, 8000}, {8000, 9000}, {9000, 10000}};
+    auto bitmap = RoaringBitmap32::From({});
+    auto filtered_ranges = 
PrefetchFileBatchReaderImpl::FilterReadRanges(read_ranges, bitmap);
+    ASSERT_TRUE(filtered_ranges.empty());
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, FilterReadRangesWithBitmap) {
+    auto data_array = PrepareArray(10000);
+    std::set<int32_t> valid_row_ids;
+    for (int32_t i = 1000; i < 2000; i++) {
+        valid_row_ids.insert(i);
+    }
+    for (int32_t i = 3000; i < 6500; i++) {
+        valid_row_ids.insert(i);
+    }
+    std::vector<int32_t> bitmap_data(valid_row_ids.begin(), 
valid_row_ids.end());
+    auto bitmap = RoaringBitmap32::From(bitmap_data);
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges = {
+        {0, 1000},    {1000, 2000}, {2000, 3000}, {3000, 4000}, {4000, 5000},
+        {5000, 6000}, {6000, 7000}, {7000, 8000}, {8000, 9000}, {9000, 10000}};
+    auto filtered_ranges = 
PrefetchFileBatchReaderImpl::FilterReadRanges(read_ranges, bitmap);
+    std::vector<std::pair<uint64_t, uint64_t>> expected_filtered_ranges = {
+        {1000, 2000}, {3000, 4000}, {4000, 5000}, {5000, 6000}, {6000, 7000}};
+    ASSERT_EQ(expected_filtered_ranges, filtered_ranges);
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, DispatchReadRangesEmpty) {
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges;
+    auto read_ranges_in_group = 
PrefetchFileBatchReaderImpl::DispatchReadRanges(read_ranges, 3);
+    ASSERT_EQ(read_ranges_in_group.size(), 3);
+    ASSERT_TRUE(read_ranges_in_group[0].empty());
+    ASSERT_TRUE(read_ranges_in_group[1].empty());
+    ASSERT_TRUE(read_ranges_in_group[2].empty());
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, DispatchReadRanges) {
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges = {
+        {0, 10000}, {10000, 20000}, {20000, 30000}, {30000, 40000}};
+    auto read_ranges_in_group = 
PrefetchFileBatchReaderImpl::DispatchReadRanges(read_ranges, 3);
+    std::vector<std::pair<uint64_t, uint64_t>> expected_group_0 = {{0, 10000}, 
{30000, 40000}};
+    ASSERT_EQ(read_ranges_in_group[0], expected_group_0);
+    std::vector<std::pair<uint64_t, uint64_t>> expected_group_1 = {{10000, 
20000}};
+    ASSERT_EQ(read_ranges_in_group[1], expected_group_1);
+    std::vector<std::pair<uint64_t, uint64_t>> expected_group_2 = {{20000, 
30000}};
+    ASSERT_EQ(read_ranges_in_group[2], expected_group_2);
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, RefreshReadRanges) {
+    auto data_array = PrepareArray(101);
+    int32_t batch_size = 30;
+    int32_t prefetch_max_parallel_num = 3;
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num * 2, 
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/false, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+    auto prefetch_reader = 
dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
+    ASSERT_OK(prefetch_reader->RefreshReadRanges());
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges_0 = {{0, 30}, {90, 
101}};
+    auto mock_reader_0 = 
dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[0].get());
+    ASSERT_EQ(mock_reader_0->GetReadRanges(), read_ranges_0);
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges_1 = {{30, 60}};
+    auto mock_reader_1 = 
dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[1].get());
+    ASSERT_EQ(mock_reader_1->GetReadRanges(), read_ranges_1);
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges_2 = {{60, 90}};
+    auto mock_reader_2 = 
dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[2].get());
+    ASSERT_EQ(mock_reader_2->GetReadRanges(), read_ranges_2);
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, 
RefreshReadRangesDisablePrefetchByAdaptiveStrategy) {
+    auto data_array = PrepareArray(200);
+    int32_t batch_size = 10;
+    int32_t prefetch_max_parallel_num = 1;
+    ControlledMockFormatReaderBuilder reader_builder(data_array, data_type_, 
batch_size,
+                                                     /*read_ranges=*/{{0, 
100}},
+                                                     /*need_prefetch=*/true,
+                                                     
/*set_read_ranges_statuses=*/{});
+
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            /*prefetch_batch_count=*/2,
+            /*enable_adaptive_prefetch_strategy=*/true, executor_,
+            /*initialize_read_ranges=*/true, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+
+    ASSERT_FALSE(reader->NeedPrefetch());
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, SetReadRanges) {
+    auto data_array = PrepareArray(400);
+    int32_t batch_size = 30;
+    int32_t prefetch_max_parallel_num = 3;
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num * 2, 
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/false, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+    auto prefetch_reader = 
dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
+    ASSERT_FALSE(prefetch_reader->need_prefetch_);
+    prefetch_reader->need_prefetch_ = true;
+    std::vector<std::pair<uint64_t, uint64_t>> ranges = {
+        {0, 100}, {100, 200}, {200, 300}, {300, 400}};
+    ASSERT_OK(prefetch_reader->SetReadRanges(ranges));
+    auto& read_ranges_queue = prefetch_reader->read_ranges_;
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges;
+    for (auto& iter : read_ranges_queue) {
+        read_ranges.push_back(iter);
+    }
+    ranges.emplace_back(400, 401);
+    ASSERT_EQ(read_ranges, ranges);
+
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges_0 = {{0, 100}, 
{300, 400}};
+    auto mock_reader_0 = 
dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[0].get());
+    ASSERT_EQ(mock_reader_0->GetReadRanges(), read_ranges_0);
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges_1 = {{100, 200}};
+    auto mock_reader_1 = 
dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[1].get());
+    ASSERT_EQ(mock_reader_1->GetReadRanges(), read_ranges_1);
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges_2 = {{200, 300}};
+    auto mock_reader_2 = 
dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[2].get());
+    ASSERT_EQ(mock_reader_2->GetReadRanges(), read_ranges_2);
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, 
SetReadRangesReturnErrorWhenPushDownFailed) {
+    auto data_array = PrepareArray(100);
+    int32_t batch_size = 10;
+    int32_t prefetch_max_parallel_num = 2;
+    ControlledMockFormatReaderBuilder reader_builder(
+        data_array, data_type_, batch_size,
+        /*read_ranges=*/{{0, 50}, {50, 100}},
+        /*need_prefetch=*/true,
+        /*set_read_ranges_statuses=*/{Status::OK(), Status::IOError("set read 
ranges failed")});
+
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num * 2,
+            /*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/false, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+
+    auto prefetch_reader = 
dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
+    prefetch_reader->need_prefetch_ = true;
+
+    Status status = prefetch_reader->SetReadRanges({{0, 50}, {50, 100}});
+    ASSERT_FALSE(status.ok());
+    ASSERT_TRUE(status.IsIOError());
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, NeedInitCacheNeverMode) {
+    auto data_array = PrepareArray(10);
+    int32_t batch_size = 5;
+    int32_t prefetch_max_parallel_num = 1;
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num * 2,
+            /*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/false, 
/*prefetch_cache_mode=*/PrefetchCacheMode::NEVER,
+            CacheConfig(), GetDefaultPool()));
+
+    auto prefetch_reader = 
dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
+    ASSERT_FALSE(prefetch_reader->NeedInitCache());
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, 
WorkloopSetReadStatusWhenCacheInitFailed) {
+    auto data_array = PrepareArray(10);
+    int32_t batch_size = 5;
+    int32_t prefetch_max_parallel_num = 1;
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+    CacheConfig invalid_cache_config(
+        /*buffer_size_limit=*/512 * 1024,
+        
/*range_size_limit=*/static_cast<uint64_t>(std::numeric_limits<uint32_t>::max())
 + 1,
+        /*hole_size_limit=*/8 * 1024,
+        /*pre_buffer_limit=*/128 * 1024);
+
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num * 2,
+            /*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/false, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            invalid_cache_config, GetDefaultPool()));
+
+    auto prefetch_reader = 
dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
+    prefetch_reader->Workloop();
+
+    Status status = prefetch_reader->GetReadStatus();
+    ASSERT_FALSE(status.ok());
+    ASSERT_TRUE(status.IsInvalid());
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, DoReadBatchReturnOkWhenShutdown) {
+    auto data_array = PrepareArray(10);
+    int32_t batch_size = 5;
+    int32_t prefetch_max_parallel_num = 1;
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num * 2,
+            /*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/false, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+
+    auto prefetch_reader = 
dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
+    prefetch_reader->is_shutdown_ = true;
+    ASSERT_OK(prefetch_reader->DoReadBatch(/*reader_idx=*/0));
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, 
DoReadBatchReturnOkWhenNoCurrentReadRange) {
+    auto data_array = PrepareArray(10);
+    int32_t batch_size = 5;
+    int32_t prefetch_max_parallel_num = 1;
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num * 2,
+            /*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/false, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+
+    auto prefetch_reader = 
dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
+    prefetch_reader->read_ranges_in_group_ = {{}};
+    ASSERT_OK(prefetch_reader->DoReadBatch(/*reader_idx=*/0));
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestReadWithLargeBatchSize) {
+    auto data_array = PrepareArray(101);
+    int32_t batch_size = 150;
+    int32_t prefetch_max_parallel_num = 3;
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num * 2, 
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/true, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+    ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
+    ASSERT_OK_AND_ASSIGN(auto result_array,
+                         ReadResultCollector::CollectResult(
+                             reader.get(), /*max simulated data processing 
time*/ 100));
+    ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 101);
+    auto expected_array = std::make_shared<arrow::ChunkedArray>(data_array);
+    ASSERT_TRUE(result_array->Equals(expected_array));
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestPartialReaderSuccessRead) {
+    auto data_array = PrepareArray(101);
+    int32_t batch_size = 10;
+    int32_t prefetch_max_parallel_num = 3;
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num, 
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/true, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+    auto prefetch_reader = 
dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
+    for (int32_t i = 0; i < prefetch_max_parallel_num; i++) {
+        dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[i].get())
+            ->EnableRandomizeBatchSize(false);
+    }
+
+    arrow::ArrayVector result_array_vector;
+    ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
+    ASSERT_OK_AND_ASSIGN(auto batch_with_bitmap, 
reader->NextBatchWithBitmap());
+    auto& [batch, bitmap] = batch_with_bitmap;
+    ASSERT_EQ(batch.first->length, bitmap.Cardinality());
+    ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 0);
+    ASSERT_OK_AND_ASSIGN(auto array, 
ReadResultCollector::GetArray(std::move(batch)));
+    result_array_vector.push_back(array);
+    ASSERT_OK(prefetch_reader->GetReadStatus());
+    usleep(100000);  // sleep 100ms to ensure that the other data has been 
pushed
+    ASSERT_TRUE(HasValue(prefetch_reader->prefetch_queues_));
+
+    // Set IOError for reader[1] after the first NextBatch().
+    // Now the data in prefetch_queues_[0] is [30,39], prefetch_queues_[1] is 
[10,19],
+    // prefetch_queues_[2] is [20,29],
+    // So, the IOError will occur at [40,49].
+    dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[1].get())
+        ->SetNextBatchStatus(Status::IOError("mock error"));
+    usleep(100000);

Review Comment:
   `usleep()` is non-standard/POSIX-only and makes tests less portable; it also 
tends to introduce timing flakiness. Prefer `std::this_thread::sleep_for(...)` 
and/or (better) replace sleeps with deterministic synchronization (e.g., 
waiting on a condition variable, latches, or polling with bounded timeout on 
observable state transitions).



##########
src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp:
##########
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "paimon/common/reader/prefetch_file_batch_reader_impl.h"
+
+#include <algorithm>
+#include <chrono>
+#include <future>
+#include <thread>
+
+#include "arrow/array/array_base.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "paimon/common/executor/future.h"
+#include "paimon/common/io/cache_input_stream.h"
+#include "paimon/common/metrics/metrics_impl.h"
+#include "paimon/common/reader/reader_utils.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/scope_guard.h"
+#include "paimon/format/reader_builder.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/utils/read_ahead_cache.h"
+
+namespace arrow {
+class Schema;
+}  // namespace arrow
+
+namespace paimon {
+
+Result<std::unique_ptr<PrefetchFileBatchReaderImpl>> 
PrefetchFileBatchReaderImpl::Create(
+    const std::string& data_file_path, const ReaderBuilder* reader_builder,
+    const std::shared_ptr<FileSystem>& fs, uint32_t prefetch_max_parallel_num, 
int32_t batch_size,
+    uint32_t prefetch_batch_count, bool enable_adaptive_prefetch_strategy,
+    const std::shared_ptr<Executor>& executor, bool initialize_read_ranges,
+    PrefetchCacheMode prefetch_cache_mode, const CacheConfig& cache_config,
+    const std::shared_ptr<MemoryPool>& pool) {
+    if (prefetch_max_parallel_num == 0) {
+        return Status::Invalid("prefetch max parallel num should be greater 
than 0.");
+    }
+    if (prefetch_batch_count == 0) {
+        return Status::Invalid("prefetch batch count should be greater than 
0.");
+    }
+    if (batch_size <= 0) {
+        return Status::Invalid("batch size should be greater than 0.");
+    }
+    if (reader_builder == nullptr) {
+        return Status::Invalid("reader_builder should not be nullptr.");
+    }
+    if (fs == nullptr) {
+        return Status::Invalid("file system should not be nullptr.");
+    }
+    if (executor == nullptr) {
+        return Status::Invalid("executor should not be nullptr.");
+    }
+
+    std::shared_ptr<ReadAheadCache> cache;
+    if (prefetch_cache_mode != PrefetchCacheMode::NEVER) {
+        PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<InputStream> input_stream, 
fs->Open(data_file_path));
+        cache = std::make_shared<ReadAheadCache>(input_stream, cache_config, 
pool);
+    }
+    std::vector<std::future<Result<std::unique_ptr<FileBatchReader>>>> futures;
+    for (uint32_t i = 0; i < prefetch_max_parallel_num; i++) {
+        futures.push_back(Via(executor.get(),
+                              [&fs, &data_file_path, &reader_builder,
+                               &cache]() -> 
Result<std::unique_ptr<FileBatchReader>> {
+                                  
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<InputStream> input_stream,
+                                                         
fs->Open(data_file_path));
+                                  auto cache_input_stream = 
std::make_shared<CacheInputStream>(
+                                      std::move(input_stream), cache);
+                                  return 
reader_builder->Build(cache_input_stream);
+                              }));
+    }
+    std::vector<std::shared_ptr<PrefetchFileBatchReader>> readers;
+    for (auto& file_batch_reader : CollectAll(futures)) {
+        if (!file_batch_reader.ok()) {
+            return file_batch_reader.status();
+        }
+        std::shared_ptr<FileBatchReader> reader = 
std::move(file_batch_reader).value();
+        auto prefetch_file_batch_reader =
+            std::dynamic_pointer_cast<PrefetchFileBatchReader>(reader);
+        if (prefetch_file_batch_reader == nullptr) {
+            return Status::Invalid(
+                "failed to cast to prefetch file batch reader. file format not 
support prefetch");
+        }
+        readers.emplace_back(prefetch_file_batch_reader);
+    }
+    if (prefetch_batch_count < readers.size()) {
+        prefetch_batch_count = readers.size();
+    }
+    uint32_t prefetch_queue_capacity = prefetch_batch_count / readers.size();
+
+    auto reader = std::unique_ptr<PrefetchFileBatchReaderImpl>(new 
PrefetchFileBatchReaderImpl(
+        readers, batch_size, prefetch_queue_capacity, 
enable_adaptive_prefetch_strategy, executor,
+        cache, prefetch_cache_mode));
+    if (initialize_read_ranges) {
+        // normally initialize read ranges should be false, as set read schema 
will refresh read
+        // ranges, and set read schema will always be called before read.
+        PAIMON_RETURN_NOT_OK(reader->RefreshReadRanges());
+    }
+    return reader;
+}
+
+PrefetchFileBatchReaderImpl::PrefetchFileBatchReaderImpl(
+    const std::vector<std::shared_ptr<PrefetchFileBatchReader>>& readers, 
int32_t batch_size,
+    uint32_t prefetch_queue_capacity, bool enable_adaptive_prefetch_strategy,
+    const std::shared_ptr<Executor>& executor, const 
std::shared_ptr<ReadAheadCache>& cache,
+    PrefetchCacheMode cache_mode)
+    : readers_(std::move(readers)),
+      batch_size_(batch_size),
+      executor_(executor),
+      cache_(cache),
+      cache_mode_(cache_mode),
+      prefetch_queue_capacity_(prefetch_queue_capacity),
+      enable_adaptive_prefetch_strategy_(enable_adaptive_prefetch_strategy) {
+    for (size_t i = 0; i < readers_.size(); i++) {
+        
prefetch_queues_.emplace_back(std::make_unique<ThreadsafeQueue<PrefetchBatch>>());
+        readers_pos_.emplace_back(std::make_unique<std::atomic<uint64_t>>(0));
+        reader_is_working_.emplace_back(false);
+    }
+    parallel_num_ = readers_.size();
+}
+
+PrefetchFileBatchReaderImpl::~PrefetchFileBatchReaderImpl() {
+    (void)CleanUp();
+}
+
+Status PrefetchFileBatchReaderImpl::SetReadSchema(
+    ::ArrowSchema* read_schema, const std::shared_ptr<Predicate>& predicate,
+    const std::optional<RoaringBitmap32>& selection_bitmap) {
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Schema> schema,
+                                      arrow::ImportSchema(read_schema));
+    for (const auto& reader : readers_) {
+        auto c_schema = std::make_unique<::ArrowSchema>();
+        PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schema, 
c_schema.get()));
+        PAIMON_RETURN_NOT_OK(reader->SetReadSchema(c_schema.get(), predicate, 
selection_bitmap));
+    }
+    selection_bitmap_ = selection_bitmap;
+    predicate_ = predicate;
+    return RefreshReadRanges();
+}
+
+Status PrefetchFileBatchReaderImpl::RefreshReadRanges() {
+    PAIMON_RETURN_NOT_OK(CleanUp());
+    bool need_prefetch;
+    PAIMON_ASSIGN_OR_RAISE(auto read_ranges, 
readers_[0]->GenReadRanges(&need_prefetch));
+
+    if (!enable_adaptive_prefetch_strategy_) {
+        need_prefetch = true;
+    } else if (need_prefetch && enable_adaptive_prefetch_strategy_ && 
!read_ranges.empty()) {
+        uint64_t batch_count_in_range =
+            (read_ranges[0].second - read_ranges[0].first) / batch_size_;
+        if (batch_count_in_range > 
static_cast<uint64_t>(prefetch_queue_capacity_)) {
+            need_prefetch = false;
+        }
+    }
+
+    need_prefetch_ = need_prefetch;
+    PAIMON_RETURN_NOT_OK(SetReadRanges(FilterReadRanges(read_ranges, 
selection_bitmap_)));
+    read_ranges_freshed_ = true;
+
+    return Status::OK();
+}
+
+std::vector<std::pair<uint64_t, uint64_t>> 
PrefetchFileBatchReaderImpl::FilterReadRanges(
+    const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges,
+    const std::optional<RoaringBitmap32>& selection_bitmap) {
+    if (!selection_bitmap) {
+        return read_ranges;
+    }
+    std::vector<std::pair<uint64_t, uint64_t>> result;
+    for (const auto& read_range : read_ranges) {
+        if (selection_bitmap.value().ContainsAny(read_range.first, 
read_range.second)) {
+            result.push_back(read_range);
+        }
+    }
+    return result;
+}
+
+Status PrefetchFileBatchReaderImpl::SetReadRanges(
+    const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges) {
+    // push down read ranges for reducing IO amplification
+    read_ranges_in_group_ = DispatchReadRanges(read_ranges, readers_.size());
+    if (need_prefetch_ && readers_.size() > 1) {
+        // if prefetching isn't necessary, then setting read ranges won't be 
needed either.
+        std::vector<std::future<Status>> futures;
+        for (size_t i = 0; i < readers_.size(); i++) {
+            futures.push_back(Via(executor_.get(), [this, i]() -> Status {
+                return readers_[i]->SetReadRanges(read_ranges_in_group_[i]);
+            }));
+        }
+        for (const auto& status : CollectAll(futures)) {
+            if (!status.ok()) {
+                return status;
+            }
+        }
+    }
+    for (const auto& read_range : read_ranges) {
+        read_ranges_.push_back(read_range);
+    }
+    // Note: add a special read range out of file row count, for trigger an 
EOF access.
+    std::pair<uint64_t, uint64_t> eof_range;
+    PAIMON_ASSIGN_OR_RAISE(eof_range, EofRange());
+    read_ranges_.push_back(eof_range);
+    for (auto& read_ranges : read_ranges_in_group_) {
+        read_ranges.push_back(eof_range);
+    }
+    return Status::OK();
+}
+
+std::vector<std::vector<std::pair<uint64_t, uint64_t>>>
+PrefetchFileBatchReaderImpl::DispatchReadRanges(
+    const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges, size_t 
group_count) {
+    std::vector<std::vector<std::pair<uint64_t, uint64_t>>> 
read_ranges_in_group;
+    read_ranges_in_group.resize(group_count);
+    for (size_t i = 0; i < read_ranges.size(); i++) {
+        read_ranges_in_group[i % group_count].push_back(read_ranges[i]);
+    }
+    return read_ranges_in_group;
+}
+
+Status PrefetchFileBatchReaderImpl::CleanUp() {
+    auto clean_prefetch_queue = [this]() {
+        for (auto& prefetch_queue : prefetch_queues_) {
+            while (true) {
+                std::optional<PrefetchBatch> batch = prefetch_queue->try_pop();
+                {
+                    std::unique_lock<std::mutex> lock(working_mutex_);
+                    cv_.notify_one();
+                }
+                if (batch == std::nullopt) {
+                    break;
+                }
+                
ReaderUtils::ReleaseReadBatch(std::move(batch.value().batch.first));
+            }
+        }
+    };
+    // Clear the existing read ranges and prefetch queue
+    {
+        std::unique_lock<std::mutex> lock(working_mutex_);
+        is_shutdown_ = true;  // set is shutdown and check shutdown to avoid 
block at queue.push
+        cv_.notify_one();
+    }
+    // Join and reset the background thread if it exists
+    if (background_thread_) {
+        if (background_thread_->joinable()) {
+            background_thread_->join();
+            background_thread_.reset();
+        } else {
+            return Status::Invalid("background thread is not joinable");
+        }
+    }
+
+    read_ranges_.clear();
+    read_ranges_in_group_.clear();
+    clean_prefetch_queue();
+    for (size_t i = 0; i < readers_pos_.size(); i++) {
+        readers_pos_[i]->store(0);
+        reader_is_working_[i] = false;
+    }
+    is_shutdown_ = false;
+    if (cache_) {
+        cache_->Reset();
+    }
+    SetReadStatus(Status::OK());
+    return Status::OK();
+}
+
+bool PrefetchFileBatchReaderImpl::NeedInitCache() const {
+    switch (cache_mode_) {
+        case PrefetchCacheMode::NEVER:
+            return false;
+        case PrefetchCacheMode::EXCLUDE_PREDICATE:
+            return predicate_ == nullptr;
+        case PrefetchCacheMode::EXCLUDE_BITMAP:
+            return selection_bitmap_ == std::nullopt;
+        case PrefetchCacheMode::EXCLUDE_BITMAP_OR_PREDICATE:
+            return predicate_ == nullptr && selection_bitmap_ == std::nullopt;
+        case PrefetchCacheMode::ALWAYS:
+            return true;
+        default:
+            assert(false);
+            return true;
+    }
+}
+
+void PrefetchFileBatchReaderImpl::Workloop() {
+    std::vector<std::future<void>> futures;
+    futures.resize(readers_.size());
+    if (cache_ && NeedInitCache()) {
+        auto read_ranges = readers_[0]->PreBufferRange();
+        if (read_ranges.ok()) {
+            std::vector<ByteRange> ranges;
+            for (const auto& read_range : read_ranges.value()) {
+                ranges.emplace_back(read_range.first, read_range.second);
+            }
+            auto s = cache_->Init(std::move(ranges));
+            if (!s.ok()) {
+                SetReadStatus(s);
+            }
+        } else {
+            SetReadStatus(read_ranges.status());
+        }
+    }
+
+    while (true) {
+        if (!GetReadStatus().ok()) {
+            break;
+        }
+        if (is_shutdown_) {
+            break;
+        }
+        bool all_finished = true;
+        for (const auto& reader_pos : readers_pos_) {
+            if (reader_pos->load() != std::numeric_limits<uint64_t>::max()) {
+                all_finished = false;
+            }
+        }
+        if (all_finished) {
+            break;
+        }
+
+        bool made_progress_this_iteration = false;
+        for (size_t reader_idx = 0; reader_idx < readers_.size(); 
reader_idx++) {
+            if (!futures[reader_idx].valid() ||
+                (futures[reader_idx].wait_for(std::chrono::microseconds(0)) ==
+                 std::future_status::ready)) {
+                if (futures[reader_idx].valid()) {
+                    futures[reader_idx].get();
+                }
+                if (prefetch_queues_[reader_idx]->size() >= 
prefetch_queue_capacity_) {
+                    // queue is full, skip
+                    continue;
+                }
+                if (readers_pos_[reader_idx]->load() != 
std::numeric_limits<uint64_t>::max()) {
+                    futures[reader_idx] =
+                        Via(executor_.get(), [this, reader_idx]() { 
ReadBatch(reader_idx); });
+                    made_progress_this_iteration = true;
+                }
+            }
+        }
+        if (!made_progress_this_iteration) {
+            std::unique_lock<std::mutex> lock(working_mutex_);
+            cv_.wait(lock, [this] {
+                if (is_shutdown_) {
+                    return true;
+                }
+                for (size_t i = 0; i < reader_is_working_.size(); i++) {
+                    if (reader_is_working_[i]) {
+                        continue;
+                    }
+                    if (prefetch_queues_[i]->size() >= 
prefetch_queue_capacity_) {
+                        continue;
+                    }
+                    if (readers_pos_[i]->load() == 
std::numeric_limits<uint64_t>::max()) {
+                        continue;
+                    }
+                    return true;
+                }
+                return false;
+            });
+        }
+    }
+    Wait(futures);
+}
+
+void PrefetchFileBatchReaderImpl::ReadBatch(size_t reader_idx) {
+    Status status = DoReadBatch(reader_idx);
+    if (!status.ok()) {
+        SetReadStatus(status);
+    }
+}
+
+std::optional<std::pair<uint64_t, uint64_t>> 
PrefetchFileBatchReaderImpl::GetCurrentReadRange(
+    size_t reader_idx) const {
+    const auto& read_ranges = read_ranges_in_group_[reader_idx];
+    const auto& current_pos = readers_pos_[reader_idx];
+    uint64_t current_pos_value = current_pos->load();
+
+    for (const auto& range : read_ranges) {
+        if (current_pos_value < range.second) {
+            return range;
+        }
+    }
+    return std::nullopt;
+}
+
+Status PrefetchFileBatchReaderImpl::EnsureReaderPosition(
+    size_t reader_idx, const std::pair<uint64_t, uint64_t>& 
current_read_range) const {
+    uint64_t pos = std::max(readers_pos_[reader_idx]->load(), 
current_read_range.first);
+    if (readers_[reader_idx]->GetNextRowToRead() != pos) {
+        return readers_[reader_idx]->SeekToRow(pos);
+    }
+    return Status::OK();
+}
+
+Status PrefetchFileBatchReaderImpl::HandleReadResult(
+    size_t reader_idx, const std::pair<uint64_t, uint64_t>& read_range,
+    ReadBatchWithBitmap&& read_batch_with_bitmap) {
+    PAIMON_ASSIGN_OR_RAISE(uint64_t first_row_number,
+                           
readers_[reader_idx]->GetPreviousBatchFirstRowNumber());
+    auto& prefetch_queue = prefetch_queues_[reader_idx];
+    if (!BatchReader::IsEofBatch(read_batch_with_bitmap)) {
+        auto& [read_batch, bitmap] = read_batch_with_bitmap;
+        auto& [c_array, c_schema] = read_batch;
+
+        if (first_row_number >= read_range.second) {
+            // fully out of range, data before first_row_number has been 
filtered out
+            readers_pos_[reader_idx]->store(first_row_number);
+            ReaderUtils::ReleaseReadBatch(std::move(read_batch));
+            return Status::OK();
+        } else if (first_row_number + c_array->length > read_range.second) {
+            // partially out of range, data before read_range.second has been 
effectively consumed
+            readers_pos_[reader_idx]->store(read_range.second);
+            PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> 
src_array,
+                                              
arrow::ImportArray(c_array.get(), c_schema.get()));
+            int32_t target_length = read_range.second - first_row_number;
+            auto array = src_array->Slice(/*offset=*/0, target_length);
+            PAIMON_RETURN_NOT_OK_FROM_ARROW(
+                arrow::ExportArray(*array, c_array.get(), c_schema.get()));
+            bitmap.RemoveRange(target_length, src_array->length());
+        } else {
+            // all within the range, data before 
readers_[reader_idx]->GetNextRowToRead() has been
+            // effectively consumed
+            
readers_pos_[reader_idx]->store(readers_[reader_idx]->GetNextRowToRead());
+        }
+        if (bitmap.IsEmpty()) {
+            ReaderUtils::ReleaseReadBatch(std::move(read_batch));
+            return Status::OK();
+        }
+        prefetch_queue->push({read_range, std::move(read_batch_with_bitmap), 
first_row_number});
+    } else {
+        std::pair<uint64_t, uint64_t> eof_range;
+        PAIMON_ASSIGN_OR_RAISE(eof_range, EofRange());
+        prefetch_queue->push({eof_range, std::move(read_batch_with_bitmap), 
first_row_number});
+        readers_pos_[reader_idx]->store(std::numeric_limits<uint64_t>::max());
+    }
+    return Status::OK();
+}
+
+Status PrefetchFileBatchReaderImpl::DoReadBatch(size_t reader_idx) {
+    PAIMON_RETURN_NOT_OK(GetReadStatus());
+    if (is_shutdown_) {
+        return Status::OK();
+    }
+    std::optional<std::pair<uint64_t, uint64_t>> current_read_range =
+        GetCurrentReadRange(reader_idx);
+    if (current_read_range == std::nullopt) {
+        // No more read ranges for this reader, gracefully exit.
+        return Status::OK();
+    }
+    ScopeGuard guard([&]() {
+        std::unique_lock<std::mutex> lock(working_mutex_);
+        reader_is_working_[reader_idx] = false;
+        cv_.notify_one();
+    });
+    {
+        std::unique_lock<std::mutex> lock(working_mutex_);
+        reader_is_working_[reader_idx] = true;
+    }
+
+    const auto& read_range = current_read_range.value();
+    FileBatchReader* reader = readers_[reader_idx].get();
+    PAIMON_RETURN_NOT_OK(EnsureReaderPosition(reader_idx, read_range));
+
+    PAIMON_ASSIGN_OR_RAISE(ReadBatchWithBitmap read_batch_with_bitmap,
+                           reader->NextBatchWithBitmap());
+
+    return HandleReadResult(reader_idx, read_range, 
std::move(read_batch_with_bitmap));
+}
+
+Result<BatchReader::ReadBatchWithBitmap> 
PrefetchFileBatchReaderImpl::NextBatchWithBitmap() {
+    if (!read_ranges_freshed_) {
+        return Status::Invalid("prefetch reader read ranges are not 
initialized");
+    }
+    if (!background_thread_) {
+        background_thread_ =
+            
std::make_unique<std::thread>(&PrefetchFileBatchReaderImpl::Workloop, this);
+    }
+
+    while (true) {
+        PAIMON_RETURN_NOT_OK(GetReadStatus());
+        if (is_shutdown_) {
+            return Status::Invalid(
+                "prefetch reader has inconsistent state, maybe read while 
closing reader or change "
+                "read schema");
+        }
+        std::optional<std::pair<uint64_t, uint64_t>> min_range;
+        size_t eof_count = 0;
+        size_t value_count = 0;
+        for (auto& prefetch_queue : prefetch_queues_) {
+            PAIMON_RETURN_NOT_OK(GetReadStatus());
+            const PrefetchBatch* peek_batch = prefetch_queue->try_front();
+            if (!peek_batch) {
+                continue;
+            }
+            if (min_range == std::nullopt) {
+                min_range = peek_batch->read_range;
+            } else {
+                if (peek_batch->read_range.first < min_range.value().first) {
+                    min_range = peek_batch->read_range;
+                }
+            }
+            value_count++;
+            PAIMON_ASSIGN_OR_RAISE(bool is_eof_range, 
IsEofRange(peek_batch->read_range));
+            if (is_eof_range) {
+                eof_count++;
+                continue;
+            }
+
+            const auto& current_read_range = read_ranges_.front();
+            if (peek_batch->read_range == current_read_range) {
+                auto prefetch_batch = prefetch_queue->try_pop();
+                {
+                    std::unique_lock<std::mutex> lock(working_mutex_);
+                    cv_.notify_one();
+                }
+                previous_batch_first_row_num_ = 
prefetch_batch.value().previous_batch_first_row_num;
+                return std::move(prefetch_batch).value().batch;
+            }
+        }
+        if (eof_count == prefetch_queues_.size()) {
+            const PrefetchBatch* peek_batch = prefetch_queues_[0]->try_front();
+            if (peek_batch == nullptr) {
+                assert(false);
+                return Status::Invalid("peek batch not suppose to be nullptr");
+            }
+            previous_batch_first_row_num_ = 
peek_batch->previous_batch_first_row_num;
+            return BatchReader::MakeEofBatchWithBitmap();
+        }
+        if (value_count == prefetch_queues_.size()) {
+            while (true) {
+                if (read_ranges_.empty()) {
+                    break;
+                }
+                const auto& current_read_range = read_ranges_.front();
+                if (current_read_range.first < min_range.value().first) {
+                    read_ranges_.pop_front();
+                } else {
+                    break;
+                }
+            }
+        } else {
+            std::this_thread::sleep_for(std::chrono::microseconds(1));
+        }

Review Comment:
   This 1µs sleep in a tight loop is effectively a busy-wait and can burn CPU 
under load or when producers are slower (especially with multiple 
readers/queues). Prefer a blocking wait strategy: e.g., wait on a condition 
variable notified when any queue receives an item (notify in `HandleReadResult` 
after `push`) and/or when `read_ranges_` advances, with a timeout to remain 
responsive to shutdown/error.



##########
src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp:
##########
@@ -0,0 +1,914 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "paimon/common/reader/prefetch_file_batch_reader_impl.h"
+
+#include <set>
+
+#include "arrow/compute/api.h"
+#include "arrow/ipc/api.h"
+#include "gtest/gtest.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/executor.h"
+#include "paimon/format/file_format.h"
+#include "paimon/format/file_format_factory.h"
+#include "paimon/format/format_writer.h"
+#include "paimon/fs/file_system_factory.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/predicate/predicate_builder.h"
+#include "paimon/testing/mock/mock_file_batch_reader.h"
+#include "paimon/testing/mock/mock_file_system.h"
+#include "paimon/testing/mock/mock_format_reader_builder.h"
+#include "paimon/testing/utils/read_result_collector.h"
+#include "paimon/testing/utils/testharness.h"
+#include "paimon/utils/read_ahead_cache.h"
+
+namespace paimon::test {
+
+class ControlledMockFileBatchReader : public MockFileBatchReader {
+ public:
+    ControlledMockFileBatchReader(const std::shared_ptr<arrow::Array>& data,
+                                  const std::shared_ptr<arrow::DataType>& 
file_schema,
+                                  int32_t read_batch_size,
+                                  std::vector<std::pair<uint64_t, uint64_t>> 
read_ranges,
+                                  bool need_prefetch, Status 
set_read_ranges_status = Status::OK())
+        : MockFileBatchReader(data, file_schema, read_batch_size),
+          read_ranges_(std::move(read_ranges)),
+          need_prefetch_(need_prefetch),
+          set_read_ranges_status_(std::move(set_read_ranges_status)) {}
+
+    Result<std::vector<std::pair<uint64_t, uint64_t>>> GenReadRanges(
+        bool* need_prefetch) const override {
+        *need_prefetch = need_prefetch_;
+        return read_ranges_;
+    }
+
+    Status SetReadRanges(const std::vector<std::pair<uint64_t, uint64_t>>& 
read_ranges) override {
+        if (!set_read_ranges_status_.ok()) {
+            return set_read_ranges_status_;
+        }
+        return MockFileBatchReader::SetReadRanges(read_ranges);
+    }
+
+ private:
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges_;
+    bool need_prefetch_ = true;
+    Status set_read_ranges_status_;
+};
+
+class ControlledMockFormatReaderBuilder : public ReaderBuilder {
+ public:
+    ControlledMockFormatReaderBuilder(const std::shared_ptr<arrow::Array>& 
data,
+                                      const std::shared_ptr<arrow::DataType>& 
schema,
+                                      int32_t read_batch_size,
+                                      std::vector<std::pair<uint64_t, 
uint64_t>> read_ranges,
+                                      bool need_prefetch,
+                                      std::vector<Status> 
set_read_ranges_statuses)
+        : data_(data),
+          schema_(schema),
+          read_batch_size_(read_batch_size),
+          read_ranges_(std::move(read_ranges)),
+          need_prefetch_(need_prefetch),
+          set_read_ranges_statuses_(std::move(set_read_ranges_statuses)) {}
+
+    ReaderBuilder* WithMemoryPool(const std::shared_ptr<MemoryPool>& pool) 
override {
+        return this;
+    }
+
+    Result<std::unique_ptr<FileBatchReader>> Build(
+        const std::shared_ptr<InputStream>& path) const override {
+        size_t index = build_count_++;
+        Status set_read_ranges_status = index < 
set_read_ranges_statuses_.size()
+                                            ? set_read_ranges_statuses_[index]
+                                            : Status::OK();
+        return std::make_unique<ControlledMockFileBatchReader>(
+            data_, schema_, read_batch_size_, read_ranges_, need_prefetch_, 
set_read_ranges_status);
+    }
+
+    Result<std::unique_ptr<FileBatchReader>> Build(const std::string& path) 
const override {
+        return Status::Invalid("do not support build reader with path in mock 
format");
+    }
+
+ private:
+    std::shared_ptr<arrow::Array> data_;
+    std::shared_ptr<arrow::DataType> schema_;
+    int32_t read_batch_size_ = 0;
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges_;
+    bool need_prefetch_ = true;
+    std::vector<Status> set_read_ranges_statuses_;
+    mutable size_t build_count_ = 0;
+};
+
+struct TestParam {
+    std::string file_format;
+    PrefetchCacheMode cache_mode;
+};
+
+class PrefetchFileBatchReaderImplTest : public ::testing::Test,
+                                        public 
::testing::WithParamInterface<TestParam> {
+ public:
+    void SetUp() override {
+        fields_ = {arrow::field("f0", arrow::utf8()), arrow::field("f1", 
arrow::int64()),
+                   arrow::field("f2", arrow::boolean())};
+        data_type_ = arrow::struct_(fields_);
+        mock_fs_ = std::make_shared<MockFileSystem>();
+        local_fs_ = std::make_shared<LocalFileSystem>();
+        executor_ = CreateDefaultExecutor(/*thread_count=*/2);
+        dir_ = ::paimon::test::UniqueTestDirectory::Create();
+        ASSERT_TRUE(dir_);
+    }
+    void TearDown() override {}
+
+    std::shared_ptr<arrow::Array> PrepareArray(int32_t length, int32_t offset 
= 0) {
+        arrow::StructBuilder struct_builder(
+            data_type_, arrow::default_memory_pool(),
+            {std::make_shared<arrow::StringBuilder>(), 
std::make_shared<arrow::Int64Builder>(),
+             std::make_shared<arrow::BooleanBuilder>()});
+        auto string_builder = 
static_cast<arrow::StringBuilder*>(struct_builder.field_builder(0));
+        auto big_int_builder = 
static_cast<arrow::Int64Builder*>(struct_builder.field_builder(1));
+        auto bool_builder = 
static_cast<arrow::BooleanBuilder*>(struct_builder.field_builder(2));
+        for (int32_t i = 0 + offset; i < length + offset; ++i) {
+            EXPECT_TRUE(struct_builder.Append().ok());
+            EXPECT_TRUE(string_builder->Append("str_" + 
std::to_string(i)).ok());
+            EXPECT_TRUE(big_int_builder->Append(i).ok());
+            EXPECT_TRUE(bool_builder->Append(static_cast<bool>(i % 2)).ok());
+        }
+        std::shared_ptr<arrow::Array> array;
+        EXPECT_TRUE(struct_builder.Finish(&array).ok());
+        return array;
+    }
+
+    void PrepareTestData(const std::string& file_format_str,
+                         const std::shared_ptr<arrow::Array>& array, int32_t 
stripe_row_count,
+                         int32_t row_index_stride) const {
+        // for simple case, assume that array.length() %  row_index_stride == 0
+        ASSERT_EQ(array->length() % row_index_stride, 0);
+        arrow::Schema schema(array->type()->fields());
+        ::ArrowSchema c_schema;
+        ASSERT_TRUE(arrow::ExportSchema(schema, &c_schema).ok());
+        ASSERT_OK_AND_ASSIGN(
+            std::unique_ptr<FileFormat> file_format,
+            FileFormatFactory::Get(
+                file_format_str,
+                {{"parquet.write.max-row-group-length", 
std::to_string(row_index_stride)},
+                 {"orc.row.index.stride", std::to_string(row_index_stride)}}));
+
+        ASSERT_OK_AND_ASSIGN(auto writer_builder,
+                             file_format->CreateWriterBuilder(&c_schema, 
1024));
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<OutputStream> out,
+            local_fs_->Create(PathUtil::JoinPath(dir_->Str(), "file." + 
file_format_str),
+                              /*overwrite=*/false));
+        ASSERT_OK_AND_ASSIGN(auto writer, writer_builder->Build(out, "zstd"));
+
+        int32_t write_batch_count = array->length() / row_index_stride;
+        for (int32_t i = 0; i < write_batch_count; i++) {
+            auto slice = array->Slice(i * row_index_stride, row_index_stride);
+            auto copied_array = arrow::Concatenate({slice}).ValueOr(nullptr);
+            ASSERT_TRUE(copied_array);
+            ::ArrowArray c_array;
+            ASSERT_TRUE(arrow::ExportArray(*copied_array, &c_array).ok());
+            ASSERT_OK(writer->AddBatch(&c_array));
+        }
+        ASSERT_OK(writer->Flush());
+        ASSERT_OK(writer->Finish());
+        ASSERT_OK(out->Flush());
+        ASSERT_OK(out->Close());
+    }
+
+    std::unique_ptr<PrefetchFileBatchReaderImpl> PreparePrefetchReader(
+        const std::string& file_format_str, const arrow::Schema* read_schema,
+        const std::shared_ptr<Predicate>& predicate,
+        const std::optional<RoaringBitmap32>& selection_bitmap, int32_t 
batch_size,
+        int32_t prefetch_max_parallel_num, PrefetchCacheMode cache_mode) const 
{
+        EXPECT_OK_AND_ASSIGN(std::unique_ptr<FileFormat> file_format,
+                             FileFormatFactory::Get(file_format_str, {}));
+        EXPECT_OK_AND_ASSIGN(auto reader_builder, 
file_format->CreateReaderBuilder(batch_size));
+        EXPECT_OK_AND_ASSIGN(
+            std::unique_ptr<PrefetchFileBatchReaderImpl> reader,
+            PrefetchFileBatchReaderImpl::Create(
+                PathUtil::JoinPath(dir_->Str(), "file." + 
file_format->Identifier()),
+                reader_builder.get(), local_fs_, prefetch_max_parallel_num, 
batch_size,
+                prefetch_max_parallel_num * 2, 
/*enable_adaptive_prefetch_strategy=*/false,
+                CreateDefaultExecutor(prefetch_max_parallel_num - 1),
+                /*initialize_read_ranges=*/false, cache_mode, CacheConfig(), 
GetDefaultPool()));
+        std::unique_ptr<ArrowSchema> c_schema = 
std::make_unique<ArrowSchema>();
+        auto arrow_status = arrow::ExportSchema(*read_schema, c_schema.get());
+        EXPECT_TRUE(arrow_status.ok());
+        EXPECT_OK(reader->SetReadSchema(c_schema.get(), predicate, 
selection_bitmap));
+        return reader;
+    }
+
+    bool HasValue(const std::vector<
+                  
std::unique_ptr<ThreadsafeQueue<PrefetchFileBatchReaderImpl::PrefetchBatch>>>&
+                      prefetch_queues) {
+        for (const auto& queue : prefetch_queues) {
+            if (!queue->empty()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    bool CheckEqual(const std::shared_ptr<arrow::ChunkedArray>& lhs,
+                    const std::shared_ptr<arrow::ChunkedArray>& rhs) {
+        std::string lhs_str, rhs_str;
+        auto print_option = arrow::PrettyPrintOptions::Defaults();
+        print_option.window = 1000;
+        print_option.container_window = 1000;
+        EXPECT_TRUE(arrow::PrettyPrint(*lhs, print_option, &lhs_str).ok());
+        EXPECT_TRUE(arrow::PrettyPrint(*rhs, print_option, &rhs_str).ok());
+        bool is_equal = lhs->Equals(rhs);
+        if (!is_equal) {
+            std::cout << "lhs array: " << lhs_str << ", rhs array: " << 
rhs_str;
+        }
+        return is_equal;
+    }
+
+ private:
+    arrow::FieldVector fields_;
+    std::shared_ptr<arrow::DataType> data_type_;
+    std::shared_ptr<FileSystem> mock_fs_;
+    std::shared_ptr<FileSystem> local_fs_;
+    std::unique_ptr<paimon::test::UniqueTestDirectory> dir_;
+    std::shared_ptr<Executor> executor_;
+};
+
+std::vector<TestParam> PrepareTestParam() {
+    std::vector<TestParam> values = {
+        TestParam{"parquet", PrefetchCacheMode::ALWAYS},
+        TestParam{"parquet", PrefetchCacheMode::EXCLUDE_BITMAP},
+        TestParam{"parquet", PrefetchCacheMode::EXCLUDE_PREDICATE},
+        TestParam{"parquet", PrefetchCacheMode::EXCLUDE_BITMAP_OR_PREDICATE},
+        TestParam{"parquet", PrefetchCacheMode::NEVER}};
+#ifdef PAIMON_ENABLE_ORC
+    values.emplace_back(TestParam{"orc", PrefetchCacheMode::ALWAYS});
+    values.emplace_back(TestParam{"orc", PrefetchCacheMode::EXCLUDE_BITMAP});
+    values.emplace_back(TestParam{"orc", 
PrefetchCacheMode::EXCLUDE_PREDICATE});
+    values.emplace_back(TestParam{"orc", 
PrefetchCacheMode::EXCLUDE_BITMAP_OR_PREDICATE});
+    values.emplace_back(TestParam{"orc", PrefetchCacheMode::NEVER});
+#endif
+    return values;
+}
+
+INSTANTIATE_TEST_SUITE_P(TestParam, PrefetchFileBatchReaderImplTest,
+                         ::testing::ValuesIn(PrepareTestParam()));
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestSimple) {
+    auto data_array = PrepareArray(101);
+    int32_t batch_size = 10;
+    for (auto prefetch_max_parallel_num : {1, 2, 3, 5, 8, 10}) {
+        MockFormatReaderBuilder reader_builder(data_array, data_type_, 
batch_size);
+        ASSERT_OK_AND_ASSIGN(
+            auto reader,
+            PrefetchFileBatchReaderImpl::Create(
+                /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num,
+                batch_size, prefetch_max_parallel_num * 2,
+                /*enable_adaptive_prefetch_strategy=*/false, executor_,
+                /*initialize_read_ranges=*/true, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+                CacheConfig(), GetDefaultPool()));
+        ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
+        ASSERT_OK_AND_ASSIGN(auto result_array,
+                             ReadResultCollector::CollectResult(
+                                 reader.get(), /*max simulated data processing 
time*/ 100));
+        ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 101);
+        auto expected_array = 
std::make_shared<arrow::ChunkedArray>(data_array);
+        ASSERT_TRUE(result_array->Equals(expected_array));
+    }
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestReadWithLimits) {
+    auto data_array = PrepareArray(101);
+    int32_t batch_size = 10;
+    int32_t prefetch_max_parallel_num = 12;
+
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num * 2, 
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/true, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+    // simulate read limits, only read 8 batches
+    for (int32_t i = 0; i < 8; i++) {
+        ASSERT_OK_AND_ASSIGN(BatchReader::ReadBatchWithBitmap 
batch_with_bitmap,
+                             reader->NextBatchWithBitmap());
+        auto& [batch, bitmap] = batch_with_bitmap;
+        ASSERT_EQ(batch.first->length, bitmap.Cardinality());
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Array> array,
+                             ReadResultCollector::GetArray(std::move(batch)));
+        ASSERT_TRUE(array);
+    }
+    reader->Close();
+    // test metrics
+    auto read_metrics = reader->GetReaderMetrics();
+    ASSERT_TRUE(read_metrics);
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestReadWithoutInitializeReadRanges) {
+    auto data_array = PrepareArray(101);
+    int32_t batch_size = 10;
+    int32_t prefetch_max_parallel_num = 12;
+
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num * 2, 
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/false, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+    // simulate read limits, only read 8 batches
+    ASSERT_NOK_WITH_MSG(reader->NextBatchWithBitmap(),
+                        "prefetch reader read ranges are not initialized");
+    reader->Close();
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, FilterReadRangesWithoutBitmap) {
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges = {
+        {0, 1000},    {1000, 2000}, {2000, 3000}, {3000, 4000}, {4000, 5000},
+        {5000, 6000}, {6000, 7000}, {7000, 8000}, {8000, 9000}, {9000, 10000}};
+    auto filtered_ranges = 
PrefetchFileBatchReaderImpl::FilterReadRanges(read_ranges, std::nullopt);
+    ASSERT_EQ(filtered_ranges, read_ranges);
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, FilterReadRangesWithAllZeroBitmap) {
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges = {
+        {0, 1000},    {1000, 2000}, {2000, 3000}, {3000, 4000}, {4000, 5000},
+        {5000, 6000}, {6000, 7000}, {7000, 8000}, {8000, 9000}, {9000, 10000}};
+    auto bitmap = RoaringBitmap32::From({});
+    auto filtered_ranges = 
PrefetchFileBatchReaderImpl::FilterReadRanges(read_ranges, bitmap);
+    ASSERT_TRUE(filtered_ranges.empty());
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, FilterReadRangesWithBitmap) {
+    auto data_array = PrepareArray(10000);
+    std::set<int32_t> valid_row_ids;
+    for (int32_t i = 1000; i < 2000; i++) {
+        valid_row_ids.insert(i);
+    }
+    for (int32_t i = 3000; i < 6500; i++) {
+        valid_row_ids.insert(i);
+    }
+    std::vector<int32_t> bitmap_data(valid_row_ids.begin(), 
valid_row_ids.end());
+    auto bitmap = RoaringBitmap32::From(bitmap_data);
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges = {
+        {0, 1000},    {1000, 2000}, {2000, 3000}, {3000, 4000}, {4000, 5000},
+        {5000, 6000}, {6000, 7000}, {7000, 8000}, {8000, 9000}, {9000, 10000}};
+    auto filtered_ranges = 
PrefetchFileBatchReaderImpl::FilterReadRanges(read_ranges, bitmap);
+    std::vector<std::pair<uint64_t, uint64_t>> expected_filtered_ranges = {
+        {1000, 2000}, {3000, 4000}, {4000, 5000}, {5000, 6000}, {6000, 7000}};
+    ASSERT_EQ(expected_filtered_ranges, filtered_ranges);
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, DispatchReadRangesEmpty) {
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges;
+    auto read_ranges_in_group = 
PrefetchFileBatchReaderImpl::DispatchReadRanges(read_ranges, 3);
+    ASSERT_EQ(read_ranges_in_group.size(), 3);
+    ASSERT_TRUE(read_ranges_in_group[0].empty());
+    ASSERT_TRUE(read_ranges_in_group[1].empty());
+    ASSERT_TRUE(read_ranges_in_group[2].empty());
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, DispatchReadRanges) {
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges = {
+        {0, 10000}, {10000, 20000}, {20000, 30000}, {30000, 40000}};
+    auto read_ranges_in_group = 
PrefetchFileBatchReaderImpl::DispatchReadRanges(read_ranges, 3);
+    std::vector<std::pair<uint64_t, uint64_t>> expected_group_0 = {{0, 10000}, 
{30000, 40000}};
+    ASSERT_EQ(read_ranges_in_group[0], expected_group_0);
+    std::vector<std::pair<uint64_t, uint64_t>> expected_group_1 = {{10000, 
20000}};
+    ASSERT_EQ(read_ranges_in_group[1], expected_group_1);
+    std::vector<std::pair<uint64_t, uint64_t>> expected_group_2 = {{20000, 
30000}};
+    ASSERT_EQ(read_ranges_in_group[2], expected_group_2);
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, RefreshReadRanges) {
+    auto data_array = PrepareArray(101);
+    int32_t batch_size = 30;
+    int32_t prefetch_max_parallel_num = 3;
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num * 2, 
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/false, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+    auto prefetch_reader = 
dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
+    ASSERT_OK(prefetch_reader->RefreshReadRanges());
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges_0 = {{0, 30}, {90, 
101}};
+    auto mock_reader_0 = 
dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[0].get());
+    ASSERT_EQ(mock_reader_0->GetReadRanges(), read_ranges_0);

Review Comment:
   This test directly accesses `PrefetchFileBatchReaderImpl` internals 
(`readers_` etc.) which are `private` in the header, so it will not compile 
unless the class exposes test hooks. Fix by (mandatory) either: (a) rewriting 
assertions to use only the public API, or (b) adding a minimal test-only 
accessor/friend (e.g., `friend class PrefetchFileBatchReaderImplTest;` or a 
dedicated `TestingAccess` struct) to expose the needed state.



##########
src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp:
##########
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "paimon/common/reader/prefetch_file_batch_reader_impl.h"
+
+#include <algorithm>
+#include <chrono>
+#include <future>
+#include <thread>
+
+#include "arrow/array/array_base.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "paimon/common/executor/future.h"
+#include "paimon/common/io/cache_input_stream.h"
+#include "paimon/common/metrics/metrics_impl.h"
+#include "paimon/common/reader/reader_utils.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/scope_guard.h"
+#include "paimon/format/reader_builder.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/utils/read_ahead_cache.h"
+
+namespace arrow {
+class Schema;
+}  // namespace arrow
+
+namespace paimon {
+
+Result<std::unique_ptr<PrefetchFileBatchReaderImpl>> 
PrefetchFileBatchReaderImpl::Create(
+    const std::string& data_file_path, const ReaderBuilder* reader_builder,
+    const std::shared_ptr<FileSystem>& fs, uint32_t prefetch_max_parallel_num, 
int32_t batch_size,
+    uint32_t prefetch_batch_count, bool enable_adaptive_prefetch_strategy,
+    const std::shared_ptr<Executor>& executor, bool initialize_read_ranges,
+    PrefetchCacheMode prefetch_cache_mode, const CacheConfig& cache_config,
+    const std::shared_ptr<MemoryPool>& pool) {
+    if (prefetch_max_parallel_num == 0) {
+        return Status::Invalid("prefetch max parallel num should be greater 
than 0.");
+    }
+    if (prefetch_batch_count == 0) {
+        return Status::Invalid("prefetch batch count should be greater than 
0.");
+    }
+    if (batch_size <= 0) {
+        return Status::Invalid("batch size should be greater than 0.");
+    }
+    if (reader_builder == nullptr) {
+        return Status::Invalid("reader_builder should not be nullptr.");
+    }
+    if (fs == nullptr) {
+        return Status::Invalid("file system should not be nullptr.");
+    }
+    if (executor == nullptr) {
+        return Status::Invalid("executor should not be nullptr.");
+    }
+
+    std::shared_ptr<ReadAheadCache> cache;
+    if (prefetch_cache_mode != PrefetchCacheMode::NEVER) {
+        PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<InputStream> input_stream, 
fs->Open(data_file_path));
+        cache = std::make_shared<ReadAheadCache>(input_stream, cache_config, 
pool);
+    }
+    std::vector<std::future<Result<std::unique_ptr<FileBatchReader>>>> futures;
+    for (uint32_t i = 0; i < prefetch_max_parallel_num; i++) {
+        futures.push_back(Via(executor.get(),
+                              [&fs, &data_file_path, &reader_builder,
+                               &cache]() -> 
Result<std::unique_ptr<FileBatchReader>> {
+                                  
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<InputStream> input_stream,
+                                                         
fs->Open(data_file_path));
+                                  auto cache_input_stream = 
std::make_shared<CacheInputStream>(
+                                      std::move(input_stream), cache);
+                                  return 
reader_builder->Build(cache_input_stream);
+                              }));
+    }
+    std::vector<std::shared_ptr<PrefetchFileBatchReader>> readers;
+    for (auto& file_batch_reader : CollectAll(futures)) {
+        if (!file_batch_reader.ok()) {
+            return file_batch_reader.status();
+        }
+        std::shared_ptr<FileBatchReader> reader = 
std::move(file_batch_reader).value();
+        auto prefetch_file_batch_reader =
+            std::dynamic_pointer_cast<PrefetchFileBatchReader>(reader);
+        if (prefetch_file_batch_reader == nullptr) {
+            return Status::Invalid(
+                "failed to cast to prefetch file batch reader. file format not 
support prefetch");
+        }
+        readers.emplace_back(prefetch_file_batch_reader);
+    }
+    if (prefetch_batch_count < readers.size()) {
+        prefetch_batch_count = readers.size();
+    }
+    uint32_t prefetch_queue_capacity = prefetch_batch_count / readers.size();
+
+    auto reader = std::unique_ptr<PrefetchFileBatchReaderImpl>(new 
PrefetchFileBatchReaderImpl(
+        readers, batch_size, prefetch_queue_capacity, 
enable_adaptive_prefetch_strategy, executor,
+        cache, prefetch_cache_mode));
+    if (initialize_read_ranges) {
+        // normally initialize read ranges should be false, as set read schema 
will refresh read
+        // ranges, and set read schema will always be called before read.
+        PAIMON_RETURN_NOT_OK(reader->RefreshReadRanges());
+    }
+    return reader;
+}
+
+PrefetchFileBatchReaderImpl::PrefetchFileBatchReaderImpl(
+    const std::vector<std::shared_ptr<PrefetchFileBatchReader>>& readers, 
int32_t batch_size,
+    uint32_t prefetch_queue_capacity, bool enable_adaptive_prefetch_strategy,
+    const std::shared_ptr<Executor>& executor, const 
std::shared_ptr<ReadAheadCache>& cache,
+    PrefetchCacheMode cache_mode)
+    : readers_(std::move(readers)),
+      batch_size_(batch_size),
+      executor_(executor),
+      cache_(cache),
+      cache_mode_(cache_mode),
+      prefetch_queue_capacity_(prefetch_queue_capacity),
+      enable_adaptive_prefetch_strategy_(enable_adaptive_prefetch_strategy) {
+    for (size_t i = 0; i < readers_.size(); i++) {
+        
prefetch_queues_.emplace_back(std::make_unique<ThreadsafeQueue<PrefetchBatch>>());
+        readers_pos_.emplace_back(std::make_unique<std::atomic<uint64_t>>(0));
+        reader_is_working_.emplace_back(false);
+    }
+    parallel_num_ = readers_.size();
+}
+
+PrefetchFileBatchReaderImpl::~PrefetchFileBatchReaderImpl() {
+    (void)CleanUp();
+}
+
+Status PrefetchFileBatchReaderImpl::SetReadSchema(
+    ::ArrowSchema* read_schema, const std::shared_ptr<Predicate>& predicate,
+    const std::optional<RoaringBitmap32>& selection_bitmap) {
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Schema> schema,
+                                      arrow::ImportSchema(read_schema));
+    for (const auto& reader : readers_) {
+        auto c_schema = std::make_unique<::ArrowSchema>();
+        PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schema, 
c_schema.get()));
+        PAIMON_RETURN_NOT_OK(reader->SetReadSchema(c_schema.get(), predicate, 
selection_bitmap));
+    }
+    selection_bitmap_ = selection_bitmap;
+    predicate_ = predicate;
+    return RefreshReadRanges();
+}
+
+Status PrefetchFileBatchReaderImpl::RefreshReadRanges() {
+    PAIMON_RETURN_NOT_OK(CleanUp());
+    bool need_prefetch;
+    PAIMON_ASSIGN_OR_RAISE(auto read_ranges, 
readers_[0]->GenReadRanges(&need_prefetch));
+
+    if (!enable_adaptive_prefetch_strategy_) {
+        need_prefetch = true;
+    } else if (need_prefetch && enable_adaptive_prefetch_strategy_ && 
!read_ranges.empty()) {
+        uint64_t batch_count_in_range =
+            (read_ranges[0].second - read_ranges[0].first) / batch_size_;
+        if (batch_count_in_range > 
static_cast<uint64_t>(prefetch_queue_capacity_)) {
+            need_prefetch = false;
+        }
+    }
+
+    need_prefetch_ = need_prefetch;
+    PAIMON_RETURN_NOT_OK(SetReadRanges(FilterReadRanges(read_ranges, 
selection_bitmap_)));
+    read_ranges_freshed_ = true;
+
+    return Status::OK();
+}
+
+std::vector<std::pair<uint64_t, uint64_t>> 
PrefetchFileBatchReaderImpl::FilterReadRanges(
+    const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges,
+    const std::optional<RoaringBitmap32>& selection_bitmap) {
+    if (!selection_bitmap) {
+        return read_ranges;
+    }
+    std::vector<std::pair<uint64_t, uint64_t>> result;
+    for (const auto& read_range : read_ranges) {
+        if (selection_bitmap.value().ContainsAny(read_range.first, 
read_range.second)) {
+            result.push_back(read_range);
+        }
+    }
+    return result;
+}
+
+Status PrefetchFileBatchReaderImpl::SetReadRanges(
+    const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges) {
+    // push down read ranges for reducing IO amplification
+    read_ranges_in_group_ = DispatchReadRanges(read_ranges, readers_.size());
+    if (need_prefetch_ && readers_.size() > 1) {
+        // if prefetching isn't necessary, then setting read ranges won't be 
needed either.
+        std::vector<std::future<Status>> futures;
+        for (size_t i = 0; i < readers_.size(); i++) {
+            futures.push_back(Via(executor_.get(), [this, i]() -> Status {
+                return readers_[i]->SetReadRanges(read_ranges_in_group_[i]);
+            }));
+        }
+        for (const auto& status : CollectAll(futures)) {
+            if (!status.ok()) {
+                return status;
+            }
+        }
+    }
+    for (const auto& read_range : read_ranges) {
+        read_ranges_.push_back(read_range);
+    }
+    // Note: add a special read range out of file row count, for trigger an 
EOF access.
+    std::pair<uint64_t, uint64_t> eof_range;
+    PAIMON_ASSIGN_OR_RAISE(eof_range, EofRange());
+    read_ranges_.push_back(eof_range);
+    for (auto& read_ranges : read_ranges_in_group_) {
+        read_ranges.push_back(eof_range);
+    }
+    return Status::OK();
+}
+
+std::vector<std::vector<std::pair<uint64_t, uint64_t>>>
+PrefetchFileBatchReaderImpl::DispatchReadRanges(
+    const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges, size_t 
group_count) {
+    std::vector<std::vector<std::pair<uint64_t, uint64_t>>> 
read_ranges_in_group;
+    read_ranges_in_group.resize(group_count);
+    for (size_t i = 0; i < read_ranges.size(); i++) {
+        read_ranges_in_group[i % group_count].push_back(read_ranges[i]);
+    }
+    return read_ranges_in_group;
+}
+
+Status PrefetchFileBatchReaderImpl::CleanUp() {
+    auto clean_prefetch_queue = [this]() {
+        for (auto& prefetch_queue : prefetch_queues_) {
+            while (true) {
+                std::optional<PrefetchBatch> batch = prefetch_queue->try_pop();
+                {
+                    std::unique_lock<std::mutex> lock(working_mutex_);
+                    cv_.notify_one();
+                }
+                if (batch == std::nullopt) {
+                    break;
+                }
+                
ReaderUtils::ReleaseReadBatch(std::move(batch.value().batch.first));
+            }
+        }
+    };
+    // Clear the existing read ranges and prefetch queue
+    {
+        std::unique_lock<std::mutex> lock(working_mutex_);
+        is_shutdown_ = true;  // set is shutdown and check shutdown to avoid 
block at queue.push
+        cv_.notify_one();
+    }
+    // Join and reset the background thread if it exists
+    if (background_thread_) {
+        if (background_thread_->joinable()) {
+            background_thread_->join();
+            background_thread_.reset();
+        } else {
+            return Status::Invalid("background thread is not joinable");
+        }
+    }
+
+    read_ranges_.clear();
+    read_ranges_in_group_.clear();
+    clean_prefetch_queue();
+    for (size_t i = 0; i < readers_pos_.size(); i++) {
+        readers_pos_[i]->store(0);
+        reader_is_working_[i] = false;
+    }
+    is_shutdown_ = false;
+    if (cache_) {
+        cache_->Reset();
+    }
+    SetReadStatus(Status::OK());
+    return Status::OK();
+}
+
+bool PrefetchFileBatchReaderImpl::NeedInitCache() const {
+    switch (cache_mode_) {
+        case PrefetchCacheMode::NEVER:
+            return false;
+        case PrefetchCacheMode::EXCLUDE_PREDICATE:
+            return predicate_ == nullptr;
+        case PrefetchCacheMode::EXCLUDE_BITMAP:
+            return selection_bitmap_ == std::nullopt;
+        case PrefetchCacheMode::EXCLUDE_BITMAP_OR_PREDICATE:
+            return predicate_ == nullptr && selection_bitmap_ == std::nullopt;
+        case PrefetchCacheMode::ALWAYS:
+            return true;
+        default:
+            assert(false);
+            return true;
+    }
+}
+
+void PrefetchFileBatchReaderImpl::Workloop() {
+    std::vector<std::future<void>> futures;
+    futures.resize(readers_.size());
+    if (cache_ && NeedInitCache()) {
+        auto read_ranges = readers_[0]->PreBufferRange();
+        if (read_ranges.ok()) {
+            std::vector<ByteRange> ranges;
+            for (const auto& read_range : read_ranges.value()) {
+                ranges.emplace_back(read_range.first, read_range.second);
+            }
+            auto s = cache_->Init(std::move(ranges));
+            if (!s.ok()) {
+                SetReadStatus(s);
+            }
+        } else {
+            SetReadStatus(read_ranges.status());
+        }
+    }
+
+    while (true) {
+        if (!GetReadStatus().ok()) {
+            break;
+        }
+        if (is_shutdown_) {
+            break;
+        }
+        bool all_finished = true;
+        for (const auto& reader_pos : readers_pos_) {
+            if (reader_pos->load() != std::numeric_limits<uint64_t>::max()) {
+                all_finished = false;
+            }
+        }
+        if (all_finished) {
+            break;
+        }
+
+        bool made_progress_this_iteration = false;
+        for (size_t reader_idx = 0; reader_idx < readers_.size(); 
reader_idx++) {
+            if (!futures[reader_idx].valid() ||
+                (futures[reader_idx].wait_for(std::chrono::microseconds(0)) ==
+                 std::future_status::ready)) {
+                if (futures[reader_idx].valid()) {
+                    futures[reader_idx].get();
+                }
+                if (prefetch_queues_[reader_idx]->size() >= 
prefetch_queue_capacity_) {
+                    // queue is full, skip
+                    continue;
+                }
+                if (readers_pos_[reader_idx]->load() != 
std::numeric_limits<uint64_t>::max()) {
+                    futures[reader_idx] =
+                        Via(executor_.get(), [this, reader_idx]() { 
ReadBatch(reader_idx); });
+                    made_progress_this_iteration = true;
+                }
+            }
+        }
+        if (!made_progress_this_iteration) {
+            std::unique_lock<std::mutex> lock(working_mutex_);
+            cv_.wait(lock, [this] {
+                if (is_shutdown_) {
+                    return true;
+                }
+                for (size_t i = 0; i < reader_is_working_.size(); i++) {
+                    if (reader_is_working_[i]) {
+                        continue;
+                    }
+                    if (prefetch_queues_[i]->size() >= 
prefetch_queue_capacity_) {
+                        continue;
+                    }
+                    if (readers_pos_[i]->load() == 
std::numeric_limits<uint64_t>::max()) {
+                        continue;
+                    }
+                    return true;
+                }
+                return false;
+            });
+        }
+    }
+    Wait(futures);
+}
+
+void PrefetchFileBatchReaderImpl::ReadBatch(size_t reader_idx) {
+    Status status = DoReadBatch(reader_idx);
+    if (!status.ok()) {
+        SetReadStatus(status);
+    }
+}
+
+std::optional<std::pair<uint64_t, uint64_t>> 
PrefetchFileBatchReaderImpl::GetCurrentReadRange(
+    size_t reader_idx) const {
+    const auto& read_ranges = read_ranges_in_group_[reader_idx];
+    const auto& current_pos = readers_pos_[reader_idx];
+    uint64_t current_pos_value = current_pos->load();
+
+    for (const auto& range : read_ranges) {
+        if (current_pos_value < range.second) {
+            return range;
+        }
+    }
+    return std::nullopt;
+}
+
+Status PrefetchFileBatchReaderImpl::EnsureReaderPosition(
+    size_t reader_idx, const std::pair<uint64_t, uint64_t>& 
current_read_range) const {
+    uint64_t pos = std::max(readers_pos_[reader_idx]->load(), 
current_read_range.first);
+    if (readers_[reader_idx]->GetNextRowToRead() != pos) {
+        return readers_[reader_idx]->SeekToRow(pos);
+    }
+    return Status::OK();
+}
+
+Status PrefetchFileBatchReaderImpl::HandleReadResult(
+    size_t reader_idx, const std::pair<uint64_t, uint64_t>& read_range,
+    ReadBatchWithBitmap&& read_batch_with_bitmap) {
+    PAIMON_ASSIGN_OR_RAISE(uint64_t first_row_number,
+                           
readers_[reader_idx]->GetPreviousBatchFirstRowNumber());
+    auto& prefetch_queue = prefetch_queues_[reader_idx];
+    if (!BatchReader::IsEofBatch(read_batch_with_bitmap)) {
+        auto& [read_batch, bitmap] = read_batch_with_bitmap;
+        auto& [c_array, c_schema] = read_batch;
+
+        if (first_row_number >= read_range.second) {
+            // fully out of range, data before first_row_number has been 
filtered out
+            readers_pos_[reader_idx]->store(first_row_number);
+            ReaderUtils::ReleaseReadBatch(std::move(read_batch));
+            return Status::OK();
+        } else if (first_row_number + c_array->length > read_range.second) {
+            // partially out of range, data before read_range.second has been 
effectively consumed
+            readers_pos_[reader_idx]->store(read_range.second);
+            PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> 
src_array,
+                                              
arrow::ImportArray(c_array.get(), c_schema.get()));
+            int32_t target_length = read_range.second - first_row_number;
+            auto array = src_array->Slice(/*offset=*/0, target_length);
+            PAIMON_RETURN_NOT_OK_FROM_ARROW(
+                arrow::ExportArray(*array, c_array.get(), c_schema.get()));
+            bitmap.RemoveRange(target_length, src_array->length());
+        } else {
+            // all within the range, data before 
readers_[reader_idx]->GetNextRowToRead() has been
+            // effectively consumed
+            
readers_pos_[reader_idx]->store(readers_[reader_idx]->GetNextRowToRead());
+        }
+        if (bitmap.IsEmpty()) {
+            ReaderUtils::ReleaseReadBatch(std::move(read_batch));
+            return Status::OK();
+        }
+        prefetch_queue->push({read_range, std::move(read_batch_with_bitmap), 
first_row_number});
+    } else {
+        std::pair<uint64_t, uint64_t> eof_range;
+        PAIMON_ASSIGN_OR_RAISE(eof_range, EofRange());
+        prefetch_queue->push({eof_range, std::move(read_batch_with_bitmap), 
first_row_number});
+        readers_pos_[reader_idx]->store(std::numeric_limits<uint64_t>::max());
+    }
+    return Status::OK();
+}
+
+Status PrefetchFileBatchReaderImpl::DoReadBatch(size_t reader_idx) {
+    PAIMON_RETURN_NOT_OK(GetReadStatus());
+    if (is_shutdown_) {
+        return Status::OK();
+    }
+    std::optional<std::pair<uint64_t, uint64_t>> current_read_range =
+        GetCurrentReadRange(reader_idx);
+    if (current_read_range == std::nullopt) {
+        // No more read ranges for this reader, gracefully exit.
+        return Status::OK();
+    }
+    ScopeGuard guard([&]() {
+        std::unique_lock<std::mutex> lock(working_mutex_);
+        reader_is_working_[reader_idx] = false;
+        cv_.notify_one();
+    });
+    {
+        std::unique_lock<std::mutex> lock(working_mutex_);
+        reader_is_working_[reader_idx] = true;
+    }
+
+    const auto& read_range = current_read_range.value();
+    FileBatchReader* reader = readers_[reader_idx].get();
+    PAIMON_RETURN_NOT_OK(EnsureReaderPosition(reader_idx, read_range));
+
+    PAIMON_ASSIGN_OR_RAISE(ReadBatchWithBitmap read_batch_with_bitmap,
+                           reader->NextBatchWithBitmap());
+
+    return HandleReadResult(reader_idx, read_range, 
std::move(read_batch_with_bitmap));
+}
+
+Result<BatchReader::ReadBatchWithBitmap> 
PrefetchFileBatchReaderImpl::NextBatchWithBitmap() {
+    if (!read_ranges_freshed_) {
+        return Status::Invalid("prefetch reader read ranges are not 
initialized");
+    }
+    if (!background_thread_) {
+        background_thread_ =
+            
std::make_unique<std::thread>(&PrefetchFileBatchReaderImpl::Workloop, this);
+    }
+
+    while (true) {
+        PAIMON_RETURN_NOT_OK(GetReadStatus());
+        if (is_shutdown_) {
+            return Status::Invalid(
+                "prefetch reader has inconsistent state, maybe read while 
closing reader or change "
+                "read schema");
+        }
+        std::optional<std::pair<uint64_t, uint64_t>> min_range;
+        size_t eof_count = 0;
+        size_t value_count = 0;
+        for (auto& prefetch_queue : prefetch_queues_) {
+            PAIMON_RETURN_NOT_OK(GetReadStatus());
+            const PrefetchBatch* peek_batch = prefetch_queue->try_front();
+            if (!peek_batch) {
+                continue;
+            }
+            if (min_range == std::nullopt) {
+                min_range = peek_batch->read_range;
+            } else {
+                if (peek_batch->read_range.first < min_range.value().first) {
+                    min_range = peek_batch->read_range;
+                }
+            }
+            value_count++;
+            PAIMON_ASSIGN_OR_RAISE(bool is_eof_range, 
IsEofRange(peek_batch->read_range));
+            if (is_eof_range) {
+                eof_count++;
+                continue;
+            }
+
+            const auto& current_read_range = read_ranges_.front();
+            if (peek_batch->read_range == current_read_range) {
+                auto prefetch_batch = prefetch_queue->try_pop();
+                {
+                    std::unique_lock<std::mutex> lock(working_mutex_);
+                    cv_.notify_one();
+                }
+                previous_batch_first_row_num_ = 
prefetch_batch.value().previous_batch_first_row_num;
+                return std::move(prefetch_batch).value().batch;
+            }
+        }
+        if (eof_count == prefetch_queues_.size()) {
+            const PrefetchBatch* peek_batch = prefetch_queues_[0]->try_front();
+            if (peek_batch == nullptr) {
+                assert(false);
+                return Status::Invalid("peek batch not suppose to be nullptr");
+            }
+            previous_batch_first_row_num_ = 
peek_batch->previous_batch_first_row_num;
+            return BatchReader::MakeEofBatchWithBitmap();
+        }
+        if (value_count == prefetch_queues_.size()) {
+            while (true) {
+                if (read_ranges_.empty()) {
+                    break;
+                }
+                const auto& current_read_range = read_ranges_.front();
+                if (current_read_range.first < min_range.value().first) {
+                    read_ranges_.pop_front();
+                } else {
+                    break;
+                }
+            }
+        } else {
+            std::this_thread::sleep_for(std::chrono::microseconds(1));
+        }
+    }
+}
+
+Status PrefetchFileBatchReaderImpl::SeekToRow(uint64_t row_number) {
+    return Status::NotImplemented("not support seek to row for prefetch 
reader");
+}
+
+std::shared_ptr<Metrics> PrefetchFileBatchReaderImpl::GetReaderMetrics() const 
{
+    return MetricsImpl::CollectReadMetrics(readers_);
+}
+
+Result<std::unique_ptr<::ArrowSchema>> 
PrefetchFileBatchReaderImpl::GetFileSchema() const {
+    assert(!readers_.empty());
+    return readers_[0]->GetFileSchema();
+}
+
+Result<uint64_t> PrefetchFileBatchReaderImpl::GetPreviousBatchFirstRowNumber() 
const {
+    return previous_batch_first_row_num_;
+}
+
+Result<uint64_t> PrefetchFileBatchReaderImpl::GetNumberOfRows() const {
+    assert(!readers_.empty());
+    return readers_[0]->GetNumberOfRows();
+}
+
+uint64_t PrefetchFileBatchReaderImpl::GetNextRowToRead() const {
+    assert(false);
+    return -1;
+}

Review Comment:
   `GetNextRowToRead()` returns `-1` in release builds (asserts compiled out), 
which becomes `uint64_t` max and can silently break callers. If this API is 
truly unsupported, return a sensible value derived from internal state (e.g., 
next expected row) or change the implementation to a well-defined behavior; if 
the interface requires it, consider tracking and returning the minimum next row 
across readers instead of an invalid sentinel.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to