This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new d5755eb  feat: add prefetch file batch reader implementation (#70)
d5755eb is described below

commit d5755eb047f89e3e9affcc38c838980a8bdcd71e
Author: Yonghao Fang <[email protected]>
AuthorDate: Wed Jun 10 11:00:10 2026 +0800

    feat: add prefetch file batch reader implementation (#70)
---
 .../reader/prefetch_file_batch_reader_impl.cpp     | 617 ++++++++++++++
 .../reader/prefetch_file_batch_reader_impl.h       | 172 ++++
 .../prefetch_file_batch_reader_impl_test.cpp       | 914 +++++++++++++++++++++
 3 files changed, 1703 insertions(+)

diff --git a/src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp 
b/src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp
new file mode 100644
index 0000000..95a11d3
--- /dev/null
+++ b/src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "paimon/common/reader/prefetch_file_batch_reader_impl.h"
+
+#include <algorithm>
+#include <chrono>
+#include <future>
+#include <thread>
+
+#include "arrow/array/array_base.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "paimon/common/executor/future.h"
+#include "paimon/common/io/cache_input_stream.h"
+#include "paimon/common/metrics/metrics_impl.h"
+#include "paimon/common/reader/reader_utils.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/scope_guard.h"
+#include "paimon/format/reader_builder.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/utils/read_ahead_cache.h"
+
+namespace arrow {
+class Schema;
+}  // namespace arrow
+
+namespace paimon {
+
+Result<std::unique_ptr<PrefetchFileBatchReaderImpl>> 
PrefetchFileBatchReaderImpl::Create(
+    const std::string& data_file_path, const ReaderBuilder* reader_builder,
+    const std::shared_ptr<FileSystem>& fs, uint32_t prefetch_max_parallel_num, 
int32_t batch_size,
+    uint32_t prefetch_batch_count, bool enable_adaptive_prefetch_strategy,
+    const std::shared_ptr<Executor>& executor, bool initialize_read_ranges,
+    PrefetchCacheMode prefetch_cache_mode, const CacheConfig& cache_config,
+    const std::shared_ptr<MemoryPool>& pool) {
+    if (prefetch_max_parallel_num == 0) {
+        return Status::Invalid("prefetch max parallel num should be greater 
than 0.");
+    }
+    if (prefetch_batch_count == 0) {
+        return Status::Invalid("prefetch batch count should be greater than 
0.");
+    }
+    if (batch_size <= 0) {
+        return Status::Invalid("batch size should be greater than 0.");
+    }
+    if (reader_builder == nullptr) {
+        return Status::Invalid("reader_builder should not be nullptr.");
+    }
+    if (fs == nullptr) {
+        return Status::Invalid("file system should not be nullptr.");
+    }
+    if (executor == nullptr) {
+        return Status::Invalid("executor should not be nullptr.");
+    }
+
+    std::shared_ptr<ReadAheadCache> cache;
+    if (prefetch_cache_mode != PrefetchCacheMode::NEVER) {
+        PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<InputStream> input_stream, 
fs->Open(data_file_path));
+        cache = std::make_shared<ReadAheadCache>(input_stream, cache_config, 
pool);
+    }
+    std::vector<std::future<Result<std::unique_ptr<FileBatchReader>>>> futures;
+    for (uint32_t i = 0; i < prefetch_max_parallel_num; i++) {
+        futures.push_back(Via(executor.get(),
+                              [&fs, &data_file_path, &reader_builder,
+                               &cache]() -> 
Result<std::unique_ptr<FileBatchReader>> {
+                                  
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<InputStream> input_stream,
+                                                         
fs->Open(data_file_path));
+                                  auto cache_input_stream = 
std::make_shared<CacheInputStream>(
+                                      std::move(input_stream), cache);
+                                  return 
reader_builder->Build(cache_input_stream);
+                              }));
+    }
+    std::vector<std::shared_ptr<PrefetchFileBatchReader>> readers;
+    for (auto& file_batch_reader : CollectAll(futures)) {
+        if (!file_batch_reader.ok()) {
+            return file_batch_reader.status();
+        }
+        std::shared_ptr<FileBatchReader> reader = 
std::move(file_batch_reader).value();
+        auto prefetch_file_batch_reader =
+            std::dynamic_pointer_cast<PrefetchFileBatchReader>(reader);
+        if (prefetch_file_batch_reader == nullptr) {
+            return Status::Invalid(
+                "failed to cast to prefetch file batch reader. file format not 
support prefetch");
+        }
+        readers.emplace_back(prefetch_file_batch_reader);
+    }
+    if (prefetch_batch_count < readers.size()) {
+        prefetch_batch_count = readers.size();
+    }
+    uint32_t prefetch_queue_capacity = prefetch_batch_count / readers.size();
+
+    auto reader = std::unique_ptr<PrefetchFileBatchReaderImpl>(new 
PrefetchFileBatchReaderImpl(
+        readers, batch_size, prefetch_queue_capacity, 
enable_adaptive_prefetch_strategy, executor,
+        cache, prefetch_cache_mode));
+    if (initialize_read_ranges) {
+        // normally initialize read ranges should be false, as set read schema 
will refresh read
+        // ranges, and set read schema will always be called before read.
+        PAIMON_RETURN_NOT_OK(reader->RefreshReadRanges());
+    }
+    return reader;
+}
+
+PrefetchFileBatchReaderImpl::PrefetchFileBatchReaderImpl(
+    const std::vector<std::shared_ptr<PrefetchFileBatchReader>>& readers, 
int32_t batch_size,
+    uint32_t prefetch_queue_capacity, bool enable_adaptive_prefetch_strategy,
+    const std::shared_ptr<Executor>& executor, const 
std::shared_ptr<ReadAheadCache>& cache,
+    PrefetchCacheMode cache_mode)
+    : readers_(std::move(readers)),
+      batch_size_(batch_size),
+      executor_(executor),
+      cache_(cache),
+      cache_mode_(cache_mode),
+      prefetch_queue_capacity_(prefetch_queue_capacity),
+      enable_adaptive_prefetch_strategy_(enable_adaptive_prefetch_strategy) {
+    for (size_t i = 0; i < readers_.size(); i++) {
+        
prefetch_queues_.emplace_back(std::make_unique<ThreadsafeQueue<PrefetchBatch>>());
+        readers_pos_.emplace_back(std::make_unique<std::atomic<uint64_t>>(0));
+        reader_is_working_.emplace_back(false);
+    }
+    parallel_num_ = readers_.size();
+}
+
+PrefetchFileBatchReaderImpl::~PrefetchFileBatchReaderImpl() {
+    (void)CleanUp();
+}
+
+Status PrefetchFileBatchReaderImpl::SetReadSchema(
+    ::ArrowSchema* read_schema, const std::shared_ptr<Predicate>& predicate,
+    const std::optional<RoaringBitmap32>& selection_bitmap) {
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Schema> schema,
+                                      arrow::ImportSchema(read_schema));
+    for (const auto& reader : readers_) {
+        auto c_schema = std::make_unique<::ArrowSchema>();
+        PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schema, 
c_schema.get()));
+        PAIMON_RETURN_NOT_OK(reader->SetReadSchema(c_schema.get(), predicate, 
selection_bitmap));
+    }
+    selection_bitmap_ = selection_bitmap;
+    predicate_ = predicate;
+    return RefreshReadRanges();
+}
+
+Status PrefetchFileBatchReaderImpl::RefreshReadRanges() {
+    PAIMON_RETURN_NOT_OK(CleanUp());
+    bool need_prefetch;
+    PAIMON_ASSIGN_OR_RAISE(auto read_ranges, 
readers_[0]->GenReadRanges(&need_prefetch));
+
+    if (!enable_adaptive_prefetch_strategy_) {
+        need_prefetch = true;
+    } else if (need_prefetch && enable_adaptive_prefetch_strategy_ && 
!read_ranges.empty()) {
+        uint64_t batch_count_in_range =
+            (read_ranges[0].second - read_ranges[0].first) / batch_size_;
+        if (batch_count_in_range > 
static_cast<uint64_t>(prefetch_queue_capacity_)) {
+            need_prefetch = false;
+        }
+    }
+
+    need_prefetch_ = need_prefetch;
+    PAIMON_RETURN_NOT_OK(SetReadRanges(FilterReadRanges(read_ranges, 
selection_bitmap_)));
+    read_ranges_freshed_ = true;
+
+    return Status::OK();
+}
+
+std::vector<std::pair<uint64_t, uint64_t>> 
PrefetchFileBatchReaderImpl::FilterReadRanges(
+    const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges,
+    const std::optional<RoaringBitmap32>& selection_bitmap) {
+    if (!selection_bitmap) {
+        return read_ranges;
+    }
+    std::vector<std::pair<uint64_t, uint64_t>> result;
+    for (const auto& read_range : read_ranges) {
+        if (selection_bitmap.value().ContainsAny(read_range.first, 
read_range.second)) {
+            result.push_back(read_range);
+        }
+    }
+    return result;
+}
+
+Status PrefetchFileBatchReaderImpl::SetReadRanges(
+    const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges) {
+    // push down read ranges for reducing IO amplification
+    read_ranges_in_group_ = DispatchReadRanges(read_ranges, readers_.size());
+    if (need_prefetch_ && readers_.size() > 1) {
+        // if prefetching isn't necessary, then setting read ranges won't be 
needed either.
+        std::vector<std::future<Status>> futures;
+        for (size_t i = 0; i < readers_.size(); i++) {
+            futures.push_back(Via(executor_.get(), [this, i]() -> Status {
+                return readers_[i]->SetReadRanges(read_ranges_in_group_[i]);
+            }));
+        }
+        for (const auto& status : CollectAll(futures)) {
+            if (!status.ok()) {
+                return status;
+            }
+        }
+    }
+    for (const auto& read_range : read_ranges) {
+        read_ranges_.push_back(read_range);
+    }
+    // Note: add a special read range out of file row count, for trigger an 
EOF access.
+    std::pair<uint64_t, uint64_t> eof_range;
+    PAIMON_ASSIGN_OR_RAISE(eof_range, EofRange());
+    read_ranges_.push_back(eof_range);
+    for (auto& read_ranges : read_ranges_in_group_) {
+        read_ranges.push_back(eof_range);
+    }
+    return Status::OK();
+}
+
+std::vector<std::vector<std::pair<uint64_t, uint64_t>>>
+PrefetchFileBatchReaderImpl::DispatchReadRanges(
+    const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges, size_t 
group_count) {
+    std::vector<std::vector<std::pair<uint64_t, uint64_t>>> 
read_ranges_in_group;
+    read_ranges_in_group.resize(group_count);
+    for (size_t i = 0; i < read_ranges.size(); i++) {
+        read_ranges_in_group[i % group_count].push_back(read_ranges[i]);
+    }
+    return read_ranges_in_group;
+}
+
+Status PrefetchFileBatchReaderImpl::CleanUp() {
+    auto clean_prefetch_queue = [this]() {
+        for (auto& prefetch_queue : prefetch_queues_) {
+            while (true) {
+                std::optional<PrefetchBatch> batch = prefetch_queue->try_pop();
+                {
+                    std::unique_lock<std::mutex> lock(working_mutex_);
+                    cv_.notify_one();
+                }
+                if (batch == std::nullopt) {
+                    break;
+                }
+                
ReaderUtils::ReleaseReadBatch(std::move(batch.value().batch.first));
+            }
+        }
+    };
+    // Clear the existing read ranges and prefetch queue
+    {
+        std::unique_lock<std::mutex> lock(working_mutex_);
+        is_shutdown_ = true;  // set is shutdown and check shutdown to avoid 
block at queue.push
+        cv_.notify_one();
+    }
+    // Join and reset the background thread if it exists
+    if (background_thread_) {
+        if (background_thread_->joinable()) {
+            background_thread_->join();
+            background_thread_.reset();
+        } else {
+            return Status::Invalid("background thread is not joinable");
+        }
+    }
+
+    read_ranges_.clear();
+    read_ranges_in_group_.clear();
+    clean_prefetch_queue();
+    for (size_t i = 0; i < readers_pos_.size(); i++) {
+        readers_pos_[i]->store(0);
+        reader_is_working_[i] = false;
+    }
+    is_shutdown_ = false;
+    if (cache_) {
+        cache_->Reset();
+    }
+    SetReadStatus(Status::OK());
+    return Status::OK();
+}
+
+bool PrefetchFileBatchReaderImpl::NeedInitCache() const {
+    switch (cache_mode_) {
+        case PrefetchCacheMode::NEVER:
+            return false;
+        case PrefetchCacheMode::EXCLUDE_PREDICATE:
+            return predicate_ == nullptr;
+        case PrefetchCacheMode::EXCLUDE_BITMAP:
+            return selection_bitmap_ == std::nullopt;
+        case PrefetchCacheMode::EXCLUDE_BITMAP_OR_PREDICATE:
+            return predicate_ == nullptr && selection_bitmap_ == std::nullopt;
+        case PrefetchCacheMode::ALWAYS:
+            return true;
+        default:
+            assert(false);
+            return true;
+    }
+}
+
+void PrefetchFileBatchReaderImpl::Workloop() {
+    std::vector<std::future<void>> futures;
+    futures.resize(readers_.size());
+    if (cache_ && NeedInitCache()) {
+        auto read_ranges = readers_[0]->PreBufferRange();
+        if (read_ranges.ok()) {
+            std::vector<ByteRange> ranges;
+            for (const auto& read_range : read_ranges.value()) {
+                ranges.emplace_back(read_range.first, read_range.second);
+            }
+            auto s = cache_->Init(std::move(ranges));
+            if (!s.ok()) {
+                SetReadStatus(s);
+            }
+        } else {
+            SetReadStatus(read_ranges.status());
+        }
+    }
+
+    while (true) {
+        if (!GetReadStatus().ok()) {
+            break;
+        }
+        if (is_shutdown_) {
+            break;
+        }
+        bool all_finished = true;
+        for (const auto& reader_pos : readers_pos_) {
+            if (reader_pos->load() != std::numeric_limits<uint64_t>::max()) {
+                all_finished = false;
+            }
+        }
+        if (all_finished) {
+            break;
+        }
+
+        bool made_progress_this_iteration = false;
+        for (size_t reader_idx = 0; reader_idx < readers_.size(); 
reader_idx++) {
+            if (!futures[reader_idx].valid() ||
+                (futures[reader_idx].wait_for(std::chrono::microseconds(0)) ==
+                 std::future_status::ready)) {
+                if (futures[reader_idx].valid()) {
+                    futures[reader_idx].get();
+                }
+                if (prefetch_queues_[reader_idx]->size() >= 
prefetch_queue_capacity_) {
+                    // queue is full, skip
+                    continue;
+                }
+                if (readers_pos_[reader_idx]->load() != 
std::numeric_limits<uint64_t>::max()) {
+                    futures[reader_idx] =
+                        Via(executor_.get(), [this, reader_idx]() { 
ReadBatch(reader_idx); });
+                    made_progress_this_iteration = true;
+                }
+            }
+        }
+        if (!made_progress_this_iteration) {
+            std::unique_lock<std::mutex> lock(working_mutex_);
+            cv_.wait(lock, [this] {
+                if (is_shutdown_) {
+                    return true;
+                }
+                for (size_t i = 0; i < reader_is_working_.size(); i++) {
+                    if (reader_is_working_[i]) {
+                        continue;
+                    }
+                    if (prefetch_queues_[i]->size() >= 
prefetch_queue_capacity_) {
+                        continue;
+                    }
+                    if (readers_pos_[i]->load() == 
std::numeric_limits<uint64_t>::max()) {
+                        continue;
+                    }
+                    return true;
+                }
+                return false;
+            });
+        }
+    }
+    Wait(futures);
+}
+
+void PrefetchFileBatchReaderImpl::ReadBatch(size_t reader_idx) {
+    Status status = DoReadBatch(reader_idx);
+    if (!status.ok()) {
+        SetReadStatus(status);
+    }
+}
+
+std::optional<std::pair<uint64_t, uint64_t>> 
PrefetchFileBatchReaderImpl::GetCurrentReadRange(
+    size_t reader_idx) const {
+    const auto& read_ranges = read_ranges_in_group_[reader_idx];
+    const auto& current_pos = readers_pos_[reader_idx];
+    uint64_t current_pos_value = current_pos->load();
+
+    for (const auto& range : read_ranges) {
+        if (current_pos_value < range.second) {
+            return range;
+        }
+    }
+    return std::nullopt;
+}
+
+Status PrefetchFileBatchReaderImpl::EnsureReaderPosition(
+    size_t reader_idx, const std::pair<uint64_t, uint64_t>& 
current_read_range) const {
+    uint64_t pos = std::max(readers_pos_[reader_idx]->load(), 
current_read_range.first);
+    if (readers_[reader_idx]->GetNextRowToRead() != pos) {
+        return readers_[reader_idx]->SeekToRow(pos);
+    }
+    return Status::OK();
+}
+
+Status PrefetchFileBatchReaderImpl::HandleReadResult(
+    size_t reader_idx, const std::pair<uint64_t, uint64_t>& read_range,
+    ReadBatchWithBitmap&& read_batch_with_bitmap) {
+    PAIMON_ASSIGN_OR_RAISE(uint64_t first_row_number,
+                           
readers_[reader_idx]->GetPreviousBatchFirstRowNumber());
+    auto& prefetch_queue = prefetch_queues_[reader_idx];
+    if (!BatchReader::IsEofBatch(read_batch_with_bitmap)) {
+        auto& [read_batch, bitmap] = read_batch_with_bitmap;
+        auto& [c_array, c_schema] = read_batch;
+
+        if (first_row_number >= read_range.second) {
+            // fully out of range, data before first_row_number has been 
filtered out
+            readers_pos_[reader_idx]->store(first_row_number);
+            ReaderUtils::ReleaseReadBatch(std::move(read_batch));
+            return Status::OK();
+        } else if (first_row_number + c_array->length > read_range.second) {
+            // partially out of range, data before read_range.second has been 
effectively consumed
+            readers_pos_[reader_idx]->store(read_range.second);
+            PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> 
src_array,
+                                              
arrow::ImportArray(c_array.get(), c_schema.get()));
+            int32_t target_length = read_range.second - first_row_number;
+            auto array = src_array->Slice(/*offset=*/0, target_length);
+            PAIMON_RETURN_NOT_OK_FROM_ARROW(
+                arrow::ExportArray(*array, c_array.get(), c_schema.get()));
+            bitmap.RemoveRange(target_length, src_array->length());
+        } else {
+            // all within the range, data before 
readers_[reader_idx]->GetNextRowToRead() has been
+            // effectively consumed
+            
readers_pos_[reader_idx]->store(readers_[reader_idx]->GetNextRowToRead());
+        }
+        if (bitmap.IsEmpty()) {
+            ReaderUtils::ReleaseReadBatch(std::move(read_batch));
+            return Status::OK();
+        }
+        prefetch_queue->push({read_range, std::move(read_batch_with_bitmap), 
first_row_number});
+    } else {
+        std::pair<uint64_t, uint64_t> eof_range;
+        PAIMON_ASSIGN_OR_RAISE(eof_range, EofRange());
+        prefetch_queue->push({eof_range, std::move(read_batch_with_bitmap), 
first_row_number});
+        readers_pos_[reader_idx]->store(std::numeric_limits<uint64_t>::max());
+    }
+    return Status::OK();
+}
+
+Status PrefetchFileBatchReaderImpl::DoReadBatch(size_t reader_idx) {
+    PAIMON_RETURN_NOT_OK(GetReadStatus());
+    if (is_shutdown_) {
+        return Status::OK();
+    }
+    std::optional<std::pair<uint64_t, uint64_t>> current_read_range =
+        GetCurrentReadRange(reader_idx);
+    if (current_read_range == std::nullopt) {
+        // No more read ranges for this reader, gracefully exit.
+        return Status::OK();
+    }
+    ScopeGuard guard([&]() {
+        std::unique_lock<std::mutex> lock(working_mutex_);
+        reader_is_working_[reader_idx] = false;
+        cv_.notify_one();
+    });
+    {
+        std::unique_lock<std::mutex> lock(working_mutex_);
+        reader_is_working_[reader_idx] = true;
+    }
+
+    const auto& read_range = current_read_range.value();
+    FileBatchReader* reader = readers_[reader_idx].get();
+    PAIMON_RETURN_NOT_OK(EnsureReaderPosition(reader_idx, read_range));
+
+    PAIMON_ASSIGN_OR_RAISE(ReadBatchWithBitmap read_batch_with_bitmap,
+                           reader->NextBatchWithBitmap());
+
+    return HandleReadResult(reader_idx, read_range, 
std::move(read_batch_with_bitmap));
+}
+
+Result<BatchReader::ReadBatchWithBitmap> 
PrefetchFileBatchReaderImpl::NextBatchWithBitmap() {
+    if (!read_ranges_freshed_) {
+        return Status::Invalid("prefetch reader read ranges are not 
initialized");
+    }
+    if (!background_thread_) {
+        background_thread_ =
+            
std::make_unique<std::thread>(&PrefetchFileBatchReaderImpl::Workloop, this);
+    }
+
+    while (true) {
+        PAIMON_RETURN_NOT_OK(GetReadStatus());
+        if (is_shutdown_) {
+            return Status::Invalid(
+                "prefetch reader has inconsistent state, maybe read while 
closing reader or change "
+                "read schema");
+        }
+        std::optional<std::pair<uint64_t, uint64_t>> min_range;
+        size_t eof_count = 0;
+        size_t value_count = 0;
+        for (auto& prefetch_queue : prefetch_queues_) {
+            PAIMON_RETURN_NOT_OK(GetReadStatus());
+            const PrefetchBatch* peek_batch = prefetch_queue->try_front();
+            if (!peek_batch) {
+                continue;
+            }
+            if (min_range == std::nullopt) {
+                min_range = peek_batch->read_range;
+            } else {
+                if (peek_batch->read_range.first < min_range.value().first) {
+                    min_range = peek_batch->read_range;
+                }
+            }
+            value_count++;
+            PAIMON_ASSIGN_OR_RAISE(bool is_eof_range, 
IsEofRange(peek_batch->read_range));
+            if (is_eof_range) {
+                eof_count++;
+                continue;
+            }
+
+            const auto& current_read_range = read_ranges_.front();
+            if (peek_batch->read_range == current_read_range) {
+                auto prefetch_batch = prefetch_queue->try_pop();
+                {
+                    std::unique_lock<std::mutex> lock(working_mutex_);
+                    cv_.notify_one();
+                }
+                previous_batch_first_row_num_ = 
prefetch_batch.value().previous_batch_first_row_num;
+                return std::move(prefetch_batch).value().batch;
+            }
+        }
+        if (eof_count == prefetch_queues_.size()) {
+            const PrefetchBatch* peek_batch = prefetch_queues_[0]->try_front();
+            if (peek_batch == nullptr) {
+                assert(false);
+                return Status::Invalid("peek batch not suppose to be nullptr");
+            }
+            previous_batch_first_row_num_ = 
peek_batch->previous_batch_first_row_num;
+            return BatchReader::MakeEofBatchWithBitmap();
+        }
+        if (value_count == prefetch_queues_.size()) {
+            while (true) {
+                if (read_ranges_.empty()) {
+                    break;
+                }
+                const auto& current_read_range = read_ranges_.front();
+                if (current_read_range.first < min_range.value().first) {
+                    read_ranges_.pop_front();
+                } else {
+                    break;
+                }
+            }
+        } else {
+            std::this_thread::sleep_for(std::chrono::microseconds(1));
+        }
+    }
+}
+
+Status PrefetchFileBatchReaderImpl::SeekToRow(uint64_t row_number) {
+    return Status::NotImplemented("not support seek to row for prefetch 
reader");
+}
+
+std::shared_ptr<Metrics> PrefetchFileBatchReaderImpl::GetReaderMetrics() const 
{
+    return MetricsImpl::CollectReadMetrics(readers_);
+}
+
+Result<std::unique_ptr<::ArrowSchema>> 
PrefetchFileBatchReaderImpl::GetFileSchema() const {
+    assert(!readers_.empty());
+    return readers_[0]->GetFileSchema();
+}
+
+Result<uint64_t> PrefetchFileBatchReaderImpl::GetPreviousBatchFirstRowNumber() 
const {
+    return previous_batch_first_row_num_;
+}
+
+Result<uint64_t> PrefetchFileBatchReaderImpl::GetNumberOfRows() const {
+    assert(!readers_.empty());
+    return readers_[0]->GetNumberOfRows();
+}
+
+uint64_t PrefetchFileBatchReaderImpl::GetNextRowToRead() const {
+    assert(false);
+    return -1;
+}
+
+void PrefetchFileBatchReaderImpl::SetReadStatus(const Status& status) {
+    std::unique_lock<std::shared_mutex> lock(rw_mutex_);
+    read_status_ = status;
+}
+
+Status PrefetchFileBatchReaderImpl::GetReadStatus() const {
+    std::shared_lock<std::shared_mutex> lock(rw_mutex_);
+    return read_status_;
+}
+Result<bool> PrefetchFileBatchReaderImpl::IsEofRange(
+    const std::pair<uint64_t, uint64_t>& read_range) const {
+    PAIMON_ASSIGN_OR_RAISE(uint64_t num_rows, GetNumberOfRows());
+    return read_range.first >= num_rows;
+}
+
+Result<std::pair<uint64_t, uint64_t>> PrefetchFileBatchReaderImpl::EofRange() 
const {
+    PAIMON_ASSIGN_OR_RAISE(uint64_t num_rows, GetNumberOfRows());
+    return std::make_pair(num_rows, num_rows + 1);
+}
+
+void PrefetchFileBatchReaderImpl::Close() {
+    (void)CleanUp();
+    for (const auto& reader : readers_) {
+        reader->Close();
+    }
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/reader/prefetch_file_batch_reader_impl.h 
b/src/paimon/common/reader/prefetch_file_batch_reader_impl.h
new file mode 100644
index 0000000..f2916da
--- /dev/null
+++ b/src/paimon/common/reader/prefetch_file_batch_reader_impl.h
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <atomic>
+#include <cassert>
+#include <condition_variable>
+#include <cstddef>
+#include <cstdint>
+#include <deque>
+#include <limits>
+#include <memory>
+#include <mutex>
+#include <optional>
+#include <shared_mutex>
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include "arrow/c/abi.h"
+#include "paimon/common/utils/threadsafe_queue.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/reader/prefetch_file_batch_reader.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/utils/read_ahead_cache.h"
+#include "paimon/utils/roaring_bitmap32.h"
+
+struct ArrowSchema;
+
+namespace paimon {
+
+class ReaderBuilder;
+class FileSystem;
+class Executor;
+class Predicate;
+class Metrics;
+
+class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader {
+ public:
+    static Result<std::unique_ptr<PrefetchFileBatchReaderImpl>> Create(
+        const std::string& data_file_path, const ReaderBuilder* reader_builder,
+        const std::shared_ptr<FileSystem>& fs, uint32_t 
prefetch_max_parallel_num,
+        int32_t batch_size, uint32_t prefetch_batch_count, bool 
enable_adaptive_prefetch_strategy,
+        const std::shared_ptr<Executor>& executor, bool initialize_read_ranges,
+        PrefetchCacheMode prefetch_cache_mode, const CacheConfig& cache_config,
+        const std::shared_ptr<MemoryPool>& pool);
+
+    ~PrefetchFileBatchReaderImpl() override;
+
+    Result<FileBatchReader::ReadBatch> NextBatch() override {
+        return Status::Invalid(
+            "paimon inner reader PrefetchFileBatchReader should use 
NextBatchWithBitmap");
+    }
+    Result<FileBatchReader::ReadBatchWithBitmap> NextBatchWithBitmap() 
override;
+
+    std::shared_ptr<Metrics> GetReaderMetrics() const override;
+
+    Result<std::unique_ptr<::ArrowSchema>> GetFileSchema() const override;
+    Status SetReadSchema(::ArrowSchema* read_schema, const 
std::shared_ptr<Predicate>& predicate,
+                         const std::optional<RoaringBitmap32>& 
selection_bitmap) override;
+
+    Status SeekToRow(uint64_t row_number) override;
+    Result<uint64_t> GetPreviousBatchFirstRowNumber() const override;
+    Result<uint64_t> GetNumberOfRows() const override;
+    uint64_t GetNextRowToRead() const override;
+    void Close() override;
+    Status SetReadRanges(const std::vector<std::pair<uint64_t, uint64_t>>& 
read_ranges) override;
+
+    Result<std::vector<std::pair<uint64_t, uint64_t>>> GenReadRanges(
+        bool* need_prefetch) const override {
+        assert(false);
+        return Status::NotImplemented("gen read ranges not implemented");
+    }
+    bool SupportPreciseBitmapSelection() const override {
+        return readers_[0]->SupportPreciseBitmapSelection();
+    }
+
+    Status RefreshReadRanges();
+
+    inline PrefetchFileBatchReader* GetFirstReader() const {
+        return readers_[0].get();
+    }
+
+    inline bool NeedPrefetch() const {
+        return need_prefetch_;
+    }
+
+ private:
+    struct PrefetchBatch {
+        std::pair<uint64_t, uint64_t> read_range;
+        BatchReader::ReadBatchWithBitmap batch;
+        uint64_t previous_batch_first_row_num;
+    };
+
+    PrefetchFileBatchReaderImpl(
+        const std::vector<std::shared_ptr<PrefetchFileBatchReader>>& readers, 
int32_t batch_size,
+        uint32_t prefetch_queue_capacity, bool 
enable_adaptive_prefetch_strategy,
+        const std::shared_ptr<Executor>& executor, const 
std::shared_ptr<ReadAheadCache>& cache,
+        PrefetchCacheMode cache_mode);
+
+    Status CleanUp();
+    void Workloop();
+    void SetReadStatus(const Status& status);
+    Status GetReadStatus() const;
+    Result<bool> IsEofRange(const std::pair<uint64_t, uint64_t>& read_range) 
const;
+    Status DoReadBatch(size_t reader_idx);
+    void ReadBatch(size_t reader_idx);
+    size_t GetEnabledReaderSize() const;
+    static std::vector<std::pair<uint64_t, uint64_t>> FilterReadRanges(
+        const std::vector<std::pair<uint64_t, uint64_t>>& read_range,
+        const std::optional<RoaringBitmap32>& selection_bitmap);
+
+    static std::vector<std::vector<std::pair<uint64_t, uint64_t>>> 
DispatchReadRanges(
+        const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges, size_t 
reader_count);
+
+    Result<std::pair<uint64_t, uint64_t>> EofRange() const;
+    std::optional<std::pair<uint64_t, uint64_t>> GetCurrentReadRange(size_t 
reader_idx) const;
+    Status EnsureReaderPosition(size_t reader_idx,
+                                const std::pair<uint64_t, uint64_t>& 
read_range) const;
+    Status HandleReadResult(size_t reader_idx, const std::pair<uint64_t, 
uint64_t>& read_range,
+                            FileBatchReader::ReadBatchWithBitmap&& 
read_batch_with_bitmap);
+    bool NeedInitCache() const;
+
+ private:
+    std::vector<std::shared_ptr<PrefetchFileBatchReader>> readers_;
+    // The meaning of readers_pos_ is: all data before this pos has been 
filtered out or effectively
+    // consumed, and the data after this pos may need to be read in the next 
round of reading.
+    std::vector<std::unique_ptr<std::atomic<uint64_t>>> readers_pos_;
+    std::vector<std::unique_ptr<std::atomic<uint64_t>>> seek_cnt_;
+    const int32_t batch_size_;
+    std::optional<RoaringBitmap32> selection_bitmap_;
+    std::shared_ptr<Predicate> predicate_;
+    std::deque<std::pair<uint64_t, uint64_t>> read_ranges_;
+    std::vector<std::vector<std::pair<uint64_t, uint64_t>>> 
read_ranges_in_group_;
+    std::vector<std::unique_ptr<ThreadsafeQueue<PrefetchBatch>>> 
prefetch_queues_;
+    std::vector<bool> reader_is_working_;
+    std::mutex working_mutex_;
+    std::condition_variable cv_;
+    std::shared_ptr<Executor> executor_;
+    std::shared_ptr<ReadAheadCache> cache_;
+    PrefetchCacheMode cache_mode_;
+
+    mutable std::shared_mutex rw_mutex_;
+    std::unique_ptr<std::thread> background_thread_;
+    Status read_status_;
+    std::atomic<bool> is_shutdown_ = false;
+    uint64_t previous_batch_first_row_num_ = 
std::numeric_limits<uint64_t>::max();
+    bool need_prefetch_ = false;
+    bool read_ranges_freshed_ = false;
+    const uint32_t prefetch_queue_capacity_;
+    const bool enable_adaptive_prefetch_strategy_;
+    int32_t parallel_num_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp 
b/src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp
new file mode 100644
index 0000000..e9832c1
--- /dev/null
+++ b/src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp
@@ -0,0 +1,914 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "paimon/common/reader/prefetch_file_batch_reader_impl.h"
+
+#include <set>
+
+#include "arrow/compute/api.h"
+#include "arrow/ipc/api.h"
+#include "gtest/gtest.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/executor.h"
+#include "paimon/format/file_format.h"
+#include "paimon/format/file_format_factory.h"
+#include "paimon/format/format_writer.h"
+#include "paimon/fs/file_system_factory.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/predicate/predicate_builder.h"
+#include "paimon/testing/mock/mock_file_batch_reader.h"
+#include "paimon/testing/mock/mock_file_system.h"
+#include "paimon/testing/mock/mock_format_reader_builder.h"
+#include "paimon/testing/utils/read_result_collector.h"
+#include "paimon/testing/utils/testharness.h"
+#include "paimon/utils/read_ahead_cache.h"
+
+namespace paimon::test {
+
+class ControlledMockFileBatchReader : public MockFileBatchReader {
+ public:
+    ControlledMockFileBatchReader(const std::shared_ptr<arrow::Array>& data,
+                                  const std::shared_ptr<arrow::DataType>& 
file_schema,
+                                  int32_t read_batch_size,
+                                  std::vector<std::pair<uint64_t, uint64_t>> 
read_ranges,
+                                  bool need_prefetch, Status 
set_read_ranges_status = Status::OK())
+        : MockFileBatchReader(data, file_schema, read_batch_size),
+          read_ranges_(std::move(read_ranges)),
+          need_prefetch_(need_prefetch),
+          set_read_ranges_status_(std::move(set_read_ranges_status)) {}
+
+    Result<std::vector<std::pair<uint64_t, uint64_t>>> GenReadRanges(
+        bool* need_prefetch) const override {
+        *need_prefetch = need_prefetch_;
+        return read_ranges_;
+    }
+
+    Status SetReadRanges(const std::vector<std::pair<uint64_t, uint64_t>>& 
read_ranges) override {
+        if (!set_read_ranges_status_.ok()) {
+            return set_read_ranges_status_;
+        }
+        return MockFileBatchReader::SetReadRanges(read_ranges);
+    }
+
+ private:
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges_;
+    bool need_prefetch_ = true;
+    Status set_read_ranges_status_;
+};
+
+class ControlledMockFormatReaderBuilder : public ReaderBuilder {
+ public:
+    ControlledMockFormatReaderBuilder(const std::shared_ptr<arrow::Array>& 
data,
+                                      const std::shared_ptr<arrow::DataType>& 
schema,
+                                      int32_t read_batch_size,
+                                      std::vector<std::pair<uint64_t, 
uint64_t>> read_ranges,
+                                      bool need_prefetch,
+                                      std::vector<Status> 
set_read_ranges_statuses)
+        : data_(data),
+          schema_(schema),
+          read_batch_size_(read_batch_size),
+          read_ranges_(std::move(read_ranges)),
+          need_prefetch_(need_prefetch),
+          set_read_ranges_statuses_(std::move(set_read_ranges_statuses)) {}
+
+    ReaderBuilder* WithMemoryPool(const std::shared_ptr<MemoryPool>& pool) 
override {
+        return this;
+    }
+
+    Result<std::unique_ptr<FileBatchReader>> Build(
+        const std::shared_ptr<InputStream>& path) const override {
+        size_t index = build_count_++;
+        Status set_read_ranges_status = index < 
set_read_ranges_statuses_.size()
+                                            ? set_read_ranges_statuses_[index]
+                                            : Status::OK();
+        return std::make_unique<ControlledMockFileBatchReader>(
+            data_, schema_, read_batch_size_, read_ranges_, need_prefetch_, 
set_read_ranges_status);
+    }
+
+    Result<std::unique_ptr<FileBatchReader>> Build(const std::string& path) 
const override {
+        return Status::Invalid("do not support build reader with path in mock 
format");
+    }
+
+ private:
+    std::shared_ptr<arrow::Array> data_;
+    std::shared_ptr<arrow::DataType> schema_;
+    int32_t read_batch_size_ = 0;
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges_;
+    bool need_prefetch_ = true;
+    std::vector<Status> set_read_ranges_statuses_;
+    mutable size_t build_count_ = 0;
+};
+
+struct TestParam {
+    std::string file_format;
+    PrefetchCacheMode cache_mode;
+};
+
+class PrefetchFileBatchReaderImplTest : public ::testing::Test,
+                                        public 
::testing::WithParamInterface<TestParam> {
+ public:
+    void SetUp() override {
+        fields_ = {arrow::field("f0", arrow::utf8()), arrow::field("f1", 
arrow::int64()),
+                   arrow::field("f2", arrow::boolean())};
+        data_type_ = arrow::struct_(fields_);
+        mock_fs_ = std::make_shared<MockFileSystem>();
+        local_fs_ = std::make_shared<LocalFileSystem>();
+        executor_ = CreateDefaultExecutor(/*thread_count=*/2);
+        dir_ = ::paimon::test::UniqueTestDirectory::Create();
+        ASSERT_TRUE(dir_);
+    }
+    void TearDown() override {}
+
+    std::shared_ptr<arrow::Array> PrepareArray(int32_t length, int32_t offset 
= 0) {
+        arrow::StructBuilder struct_builder(
+            data_type_, arrow::default_memory_pool(),
+            {std::make_shared<arrow::StringBuilder>(), 
std::make_shared<arrow::Int64Builder>(),
+             std::make_shared<arrow::BooleanBuilder>()});
+        auto string_builder = 
static_cast<arrow::StringBuilder*>(struct_builder.field_builder(0));
+        auto big_int_builder = 
static_cast<arrow::Int64Builder*>(struct_builder.field_builder(1));
+        auto bool_builder = 
static_cast<arrow::BooleanBuilder*>(struct_builder.field_builder(2));
+        for (int32_t i = 0 + offset; i < length + offset; ++i) {
+            EXPECT_TRUE(struct_builder.Append().ok());
+            EXPECT_TRUE(string_builder->Append("str_" + 
std::to_string(i)).ok());
+            EXPECT_TRUE(big_int_builder->Append(i).ok());
+            EXPECT_TRUE(bool_builder->Append(static_cast<bool>(i % 2)).ok());
+        }
+        std::shared_ptr<arrow::Array> array;
+        EXPECT_TRUE(struct_builder.Finish(&array).ok());
+        return array;
+    }
+
+    void PrepareTestData(const std::string& file_format_str,
+                         const std::shared_ptr<arrow::Array>& array, int32_t 
stripe_row_count,
+                         int32_t row_index_stride) const {
+        // for simple case, assume that array.length() %  row_index_stride == 0
+        ASSERT_EQ(array->length() % row_index_stride, 0);
+        arrow::Schema schema(array->type()->fields());
+        ::ArrowSchema c_schema;
+        ASSERT_TRUE(arrow::ExportSchema(schema, &c_schema).ok());
+        ASSERT_OK_AND_ASSIGN(
+            std::unique_ptr<FileFormat> file_format,
+            FileFormatFactory::Get(
+                file_format_str,
+                {{"parquet.write.max-row-group-length", 
std::to_string(row_index_stride)},
+                 {"orc.row.index.stride", std::to_string(row_index_stride)}}));
+
+        ASSERT_OK_AND_ASSIGN(auto writer_builder,
+                             file_format->CreateWriterBuilder(&c_schema, 
1024));
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<OutputStream> out,
+            local_fs_->Create(PathUtil::JoinPath(dir_->Str(), "file." + 
file_format_str),
+                              /*overwrite=*/false));
+        ASSERT_OK_AND_ASSIGN(auto writer, writer_builder->Build(out, "zstd"));
+
+        int32_t write_batch_count = array->length() / row_index_stride;
+        for (int32_t i = 0; i < write_batch_count; i++) {
+            auto slice = array->Slice(i * row_index_stride, row_index_stride);
+            auto copied_array = arrow::Concatenate({slice}).ValueOr(nullptr);
+            ASSERT_TRUE(copied_array);
+            ::ArrowArray c_array;
+            ASSERT_TRUE(arrow::ExportArray(*copied_array, &c_array).ok());
+            ASSERT_OK(writer->AddBatch(&c_array));
+        }
+        ASSERT_OK(writer->Flush());
+        ASSERT_OK(writer->Finish());
+        ASSERT_OK(out->Flush());
+        ASSERT_OK(out->Close());
+    }
+
+    std::unique_ptr<PrefetchFileBatchReaderImpl> PreparePrefetchReader(
+        const std::string& file_format_str, const arrow::Schema* read_schema,
+        const std::shared_ptr<Predicate>& predicate,
+        const std::optional<RoaringBitmap32>& selection_bitmap, int32_t 
batch_size,
+        int32_t prefetch_max_parallel_num, PrefetchCacheMode cache_mode) const 
{
+        EXPECT_OK_AND_ASSIGN(std::unique_ptr<FileFormat> file_format,
+                             FileFormatFactory::Get(file_format_str, {}));
+        EXPECT_OK_AND_ASSIGN(auto reader_builder, 
file_format->CreateReaderBuilder(batch_size));
+        EXPECT_OK_AND_ASSIGN(
+            std::unique_ptr<PrefetchFileBatchReaderImpl> reader,
+            PrefetchFileBatchReaderImpl::Create(
+                PathUtil::JoinPath(dir_->Str(), "file." + 
file_format->Identifier()),
+                reader_builder.get(), local_fs_, prefetch_max_parallel_num, 
batch_size,
+                prefetch_max_parallel_num * 2, 
/*enable_adaptive_prefetch_strategy=*/false,
+                CreateDefaultExecutor(prefetch_max_parallel_num - 1),
+                /*initialize_read_ranges=*/false, cache_mode, CacheConfig(), 
GetDefaultPool()));
+        std::unique_ptr<ArrowSchema> c_schema = 
std::make_unique<ArrowSchema>();
+        auto arrow_status = arrow::ExportSchema(*read_schema, c_schema.get());
+        EXPECT_TRUE(arrow_status.ok());
+        EXPECT_OK(reader->SetReadSchema(c_schema.get(), predicate, 
selection_bitmap));
+        return reader;
+    }
+
+    bool HasValue(const std::vector<
+                  
std::unique_ptr<ThreadsafeQueue<PrefetchFileBatchReaderImpl::PrefetchBatch>>>&
+                      prefetch_queues) {
+        for (const auto& queue : prefetch_queues) {
+            if (!queue->empty()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    bool CheckEqual(const std::shared_ptr<arrow::ChunkedArray>& lhs,
+                    const std::shared_ptr<arrow::ChunkedArray>& rhs) {
+        std::string lhs_str, rhs_str;
+        auto print_option = arrow::PrettyPrintOptions::Defaults();
+        print_option.window = 1000;
+        print_option.container_window = 1000;
+        EXPECT_TRUE(arrow::PrettyPrint(*lhs, print_option, &lhs_str).ok());
+        EXPECT_TRUE(arrow::PrettyPrint(*rhs, print_option, &rhs_str).ok());
+        bool is_equal = lhs->Equals(rhs);
+        if (!is_equal) {
+            std::cout << "lhs array: " << lhs_str << ", rhs array: " << 
rhs_str;
+        }
+        return is_equal;
+    }
+
+ private:
+    arrow::FieldVector fields_;
+    std::shared_ptr<arrow::DataType> data_type_;
+    std::shared_ptr<FileSystem> mock_fs_;
+    std::shared_ptr<FileSystem> local_fs_;
+    std::unique_ptr<paimon::test::UniqueTestDirectory> dir_;
+    std::shared_ptr<Executor> executor_;
+};
+
+std::vector<TestParam> PrepareTestParam() {
+    std::vector<TestParam> values = {
+        TestParam{"parquet", PrefetchCacheMode::ALWAYS},
+        TestParam{"parquet", PrefetchCacheMode::EXCLUDE_BITMAP},
+        TestParam{"parquet", PrefetchCacheMode::EXCLUDE_PREDICATE},
+        TestParam{"parquet", PrefetchCacheMode::EXCLUDE_BITMAP_OR_PREDICATE},
+        TestParam{"parquet", PrefetchCacheMode::NEVER}};
+#ifdef PAIMON_ENABLE_ORC
+    values.emplace_back(TestParam{"orc", PrefetchCacheMode::ALWAYS});
+    values.emplace_back(TestParam{"orc", PrefetchCacheMode::EXCLUDE_BITMAP});
+    values.emplace_back(TestParam{"orc", 
PrefetchCacheMode::EXCLUDE_PREDICATE});
+    values.emplace_back(TestParam{"orc", 
PrefetchCacheMode::EXCLUDE_BITMAP_OR_PREDICATE});
+    values.emplace_back(TestParam{"orc", PrefetchCacheMode::NEVER});
+#endif
+    return values;
+}
+
+INSTANTIATE_TEST_SUITE_P(TestParam, PrefetchFileBatchReaderImplTest,
+                         ::testing::ValuesIn(PrepareTestParam()));
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestSimple) {
+    auto data_array = PrepareArray(101);
+    int32_t batch_size = 10;
+    for (auto prefetch_max_parallel_num : {1, 2, 3, 5, 8, 10}) {
+        MockFormatReaderBuilder reader_builder(data_array, data_type_, 
batch_size);
+        ASSERT_OK_AND_ASSIGN(
+            auto reader,
+            PrefetchFileBatchReaderImpl::Create(
+                /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num,
+                batch_size, prefetch_max_parallel_num * 2,
+                /*enable_adaptive_prefetch_strategy=*/false, executor_,
+                /*initialize_read_ranges=*/true, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+                CacheConfig(), GetDefaultPool()));
+        ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
+        ASSERT_OK_AND_ASSIGN(auto result_array,
+                             ReadResultCollector::CollectResult(
+                                 reader.get(), /*max simulated data processing 
time*/ 100));
+        ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 101);
+        auto expected_array = 
std::make_shared<arrow::ChunkedArray>(data_array);
+        ASSERT_TRUE(result_array->Equals(expected_array));
+    }
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestReadWithLimits) {
+    auto data_array = PrepareArray(101);
+    int32_t batch_size = 10;
+    int32_t prefetch_max_parallel_num = 12;
+
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num * 2, 
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/true, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+    // simulate read limits, only read 8 batches
+    for (int32_t i = 0; i < 8; i++) {
+        ASSERT_OK_AND_ASSIGN(BatchReader::ReadBatchWithBitmap 
batch_with_bitmap,
+                             reader->NextBatchWithBitmap());
+        auto& [batch, bitmap] = batch_with_bitmap;
+        ASSERT_EQ(batch.first->length, bitmap.Cardinality());
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Array> array,
+                             ReadResultCollector::GetArray(std::move(batch)));
+        ASSERT_TRUE(array);
+    }
+    reader->Close();
+    // test metrics
+    auto read_metrics = reader->GetReaderMetrics();
+    ASSERT_TRUE(read_metrics);
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestReadWithoutInitializeReadRanges) {
+    auto data_array = PrepareArray(101);
+    int32_t batch_size = 10;
+    int32_t prefetch_max_parallel_num = 12;
+
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num * 2, 
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/false, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+    // simulate read limits, only read 8 batches
+    ASSERT_NOK_WITH_MSG(reader->NextBatchWithBitmap(),
+                        "prefetch reader read ranges are not initialized");
+    reader->Close();
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, FilterReadRangesWithoutBitmap) {
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges = {
+        {0, 1000},    {1000, 2000}, {2000, 3000}, {3000, 4000}, {4000, 5000},
+        {5000, 6000}, {6000, 7000}, {7000, 8000}, {8000, 9000}, {9000, 10000}};
+    auto filtered_ranges = 
PrefetchFileBatchReaderImpl::FilterReadRanges(read_ranges, std::nullopt);
+    ASSERT_EQ(filtered_ranges, read_ranges);
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, FilterReadRangesWithAllZeroBitmap) {
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges = {
+        {0, 1000},    {1000, 2000}, {2000, 3000}, {3000, 4000}, {4000, 5000},
+        {5000, 6000}, {6000, 7000}, {7000, 8000}, {8000, 9000}, {9000, 10000}};
+    auto bitmap = RoaringBitmap32::From({});
+    auto filtered_ranges = 
PrefetchFileBatchReaderImpl::FilterReadRanges(read_ranges, bitmap);
+    ASSERT_TRUE(filtered_ranges.empty());
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, FilterReadRangesWithBitmap) {
+    auto data_array = PrepareArray(10000);
+    std::set<int32_t> valid_row_ids;
+    for (int32_t i = 1000; i < 2000; i++) {
+        valid_row_ids.insert(i);
+    }
+    for (int32_t i = 3000; i < 6500; i++) {
+        valid_row_ids.insert(i);
+    }
+    std::vector<int32_t> bitmap_data(valid_row_ids.begin(), 
valid_row_ids.end());
+    auto bitmap = RoaringBitmap32::From(bitmap_data);
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges = {
+        {0, 1000},    {1000, 2000}, {2000, 3000}, {3000, 4000}, {4000, 5000},
+        {5000, 6000}, {6000, 7000}, {7000, 8000}, {8000, 9000}, {9000, 10000}};
+    auto filtered_ranges = 
PrefetchFileBatchReaderImpl::FilterReadRanges(read_ranges, bitmap);
+    std::vector<std::pair<uint64_t, uint64_t>> expected_filtered_ranges = {
+        {1000, 2000}, {3000, 4000}, {4000, 5000}, {5000, 6000}, {6000, 7000}};
+    ASSERT_EQ(expected_filtered_ranges, filtered_ranges);
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, DispatchReadRangesEmpty) {
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges;
+    auto read_ranges_in_group = 
PrefetchFileBatchReaderImpl::DispatchReadRanges(read_ranges, 3);
+    ASSERT_EQ(read_ranges_in_group.size(), 3);
+    ASSERT_TRUE(read_ranges_in_group[0].empty());
+    ASSERT_TRUE(read_ranges_in_group[1].empty());
+    ASSERT_TRUE(read_ranges_in_group[2].empty());
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, DispatchReadRanges) {
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges = {
+        {0, 10000}, {10000, 20000}, {20000, 30000}, {30000, 40000}};
+    auto read_ranges_in_group = 
PrefetchFileBatchReaderImpl::DispatchReadRanges(read_ranges, 3);
+    std::vector<std::pair<uint64_t, uint64_t>> expected_group_0 = {{0, 10000}, 
{30000, 40000}};
+    ASSERT_EQ(read_ranges_in_group[0], expected_group_0);
+    std::vector<std::pair<uint64_t, uint64_t>> expected_group_1 = {{10000, 
20000}};
+    ASSERT_EQ(read_ranges_in_group[1], expected_group_1);
+    std::vector<std::pair<uint64_t, uint64_t>> expected_group_2 = {{20000, 
30000}};
+    ASSERT_EQ(read_ranges_in_group[2], expected_group_2);
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, RefreshReadRanges) {
+    auto data_array = PrepareArray(101);
+    int32_t batch_size = 30;
+    int32_t prefetch_max_parallel_num = 3;
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num * 2, 
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/false, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+    auto prefetch_reader = 
dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
+    ASSERT_OK(prefetch_reader->RefreshReadRanges());
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges_0 = {{0, 30}, {90, 
101}};
+    auto mock_reader_0 = 
dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[0].get());
+    ASSERT_EQ(mock_reader_0->GetReadRanges(), read_ranges_0);
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges_1 = {{30, 60}};
+    auto mock_reader_1 = 
dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[1].get());
+    ASSERT_EQ(mock_reader_1->GetReadRanges(), read_ranges_1);
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges_2 = {{60, 90}};
+    auto mock_reader_2 = 
dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[2].get());
+    ASSERT_EQ(mock_reader_2->GetReadRanges(), read_ranges_2);
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, 
RefreshReadRangesDisablePrefetchByAdaptiveStrategy) {
+    auto data_array = PrepareArray(200);
+    int32_t batch_size = 10;
+    int32_t prefetch_max_parallel_num = 1;
+    ControlledMockFormatReaderBuilder reader_builder(data_array, data_type_, 
batch_size,
+                                                     /*read_ranges=*/{{0, 
100}},
+                                                     /*need_prefetch=*/true,
+                                                     
/*set_read_ranges_statuses=*/{});
+
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            /*prefetch_batch_count=*/2,
+            /*enable_adaptive_prefetch_strategy=*/true, executor_,
+            /*initialize_read_ranges=*/true, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+
+    ASSERT_FALSE(reader->NeedPrefetch());
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, SetReadRanges) {
+    auto data_array = PrepareArray(400);
+    int32_t batch_size = 30;
+    int32_t prefetch_max_parallel_num = 3;
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num * 2, 
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/false, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+    auto prefetch_reader = 
dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
+    ASSERT_FALSE(prefetch_reader->need_prefetch_);
+    prefetch_reader->need_prefetch_ = true;
+    std::vector<std::pair<uint64_t, uint64_t>> ranges = {
+        {0, 100}, {100, 200}, {200, 300}, {300, 400}};
+    ASSERT_OK(prefetch_reader->SetReadRanges(ranges));
+    auto& read_ranges_queue = prefetch_reader->read_ranges_;
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges;
+    for (auto& iter : read_ranges_queue) {
+        read_ranges.push_back(iter);
+    }
+    ranges.emplace_back(400, 401);
+    ASSERT_EQ(read_ranges, ranges);
+
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges_0 = {{0, 100}, 
{300, 400}};
+    auto mock_reader_0 = 
dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[0].get());
+    ASSERT_EQ(mock_reader_0->GetReadRanges(), read_ranges_0);
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges_1 = {{100, 200}};
+    auto mock_reader_1 = 
dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[1].get());
+    ASSERT_EQ(mock_reader_1->GetReadRanges(), read_ranges_1);
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges_2 = {{200, 300}};
+    auto mock_reader_2 = 
dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[2].get());
+    ASSERT_EQ(mock_reader_2->GetReadRanges(), read_ranges_2);
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, 
SetReadRangesReturnErrorWhenPushDownFailed) {
+    auto data_array = PrepareArray(100);
+    int32_t batch_size = 10;
+    int32_t prefetch_max_parallel_num = 2;
+    ControlledMockFormatReaderBuilder reader_builder(
+        data_array, data_type_, batch_size,
+        /*read_ranges=*/{{0, 50}, {50, 100}},
+        /*need_prefetch=*/true,
+        /*set_read_ranges_statuses=*/{Status::OK(), Status::IOError("set read 
ranges failed")});
+
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num * 2,
+            /*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/false, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+
+    auto prefetch_reader = 
dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
+    prefetch_reader->need_prefetch_ = true;
+
+    Status status = prefetch_reader->SetReadRanges({{0, 50}, {50, 100}});
+    ASSERT_FALSE(status.ok());
+    ASSERT_TRUE(status.IsIOError());
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, NeedInitCacheNeverMode) {
+    auto data_array = PrepareArray(10);
+    int32_t batch_size = 5;
+    int32_t prefetch_max_parallel_num = 1;
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num * 2,
+            /*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/false, 
/*prefetch_cache_mode=*/PrefetchCacheMode::NEVER,
+            CacheConfig(), GetDefaultPool()));
+
+    auto prefetch_reader = 
dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
+    ASSERT_FALSE(prefetch_reader->NeedInitCache());
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, 
WorkloopSetReadStatusWhenCacheInitFailed) {
+    auto data_array = PrepareArray(10);
+    int32_t batch_size = 5;
+    int32_t prefetch_max_parallel_num = 1;
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+    CacheConfig invalid_cache_config(
+        /*buffer_size_limit=*/512 * 1024,
+        
/*range_size_limit=*/static_cast<uint64_t>(std::numeric_limits<uint32_t>::max())
 + 1,
+        /*hole_size_limit=*/8 * 1024,
+        /*pre_buffer_limit=*/128 * 1024);
+
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num * 2,
+            /*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/false, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            invalid_cache_config, GetDefaultPool()));
+
+    auto prefetch_reader = 
dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
+    prefetch_reader->Workloop();
+
+    Status status = prefetch_reader->GetReadStatus();
+    ASSERT_FALSE(status.ok());
+    ASSERT_TRUE(status.IsInvalid());
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, DoReadBatchReturnOkWhenShutdown) {
+    auto data_array = PrepareArray(10);
+    int32_t batch_size = 5;
+    int32_t prefetch_max_parallel_num = 1;
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num * 2,
+            /*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/false, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+
+    auto prefetch_reader = 
dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
+    prefetch_reader->is_shutdown_ = true;
+    ASSERT_OK(prefetch_reader->DoReadBatch(/*reader_idx=*/0));
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, 
DoReadBatchReturnOkWhenNoCurrentReadRange) {
+    auto data_array = PrepareArray(10);
+    int32_t batch_size = 5;
+    int32_t prefetch_max_parallel_num = 1;
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num * 2,
+            /*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/false, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+
+    auto prefetch_reader = 
dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
+    prefetch_reader->read_ranges_in_group_ = {{}};
+    ASSERT_OK(prefetch_reader->DoReadBatch(/*reader_idx=*/0));
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestReadWithLargeBatchSize) {
+    auto data_array = PrepareArray(101);
+    int32_t batch_size = 150;
+    int32_t prefetch_max_parallel_num = 3;
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num * 2, 
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/true, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+    ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
+    ASSERT_OK_AND_ASSIGN(auto result_array,
+                         ReadResultCollector::CollectResult(
+                             reader.get(), /*max simulated data processing 
time*/ 100));
+    ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 101);
+    auto expected_array = std::make_shared<arrow::ChunkedArray>(data_array);
+    ASSERT_TRUE(result_array->Equals(expected_array));
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestPartialReaderSuccessRead) {
+    auto data_array = PrepareArray(101);
+    int32_t batch_size = 10;
+    int32_t prefetch_max_parallel_num = 3;
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num, 
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/true, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+    auto prefetch_reader = 
dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
+    for (int32_t i = 0; i < prefetch_max_parallel_num; i++) {
+        dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[i].get())
+            ->EnableRandomizeBatchSize(false);
+    }
+
+    arrow::ArrayVector result_array_vector;
+    ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
+    ASSERT_OK_AND_ASSIGN(auto batch_with_bitmap, 
reader->NextBatchWithBitmap());
+    auto& [batch, bitmap] = batch_with_bitmap;
+    ASSERT_EQ(batch.first->length, bitmap.Cardinality());
+    ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 0);
+    ASSERT_OK_AND_ASSIGN(auto array, 
ReadResultCollector::GetArray(std::move(batch)));
+    result_array_vector.push_back(array);
+    ASSERT_OK(prefetch_reader->GetReadStatus());
+    usleep(100000);  // sleep 100ms to ensure that the other data has been 
pushed
+    ASSERT_TRUE(HasValue(prefetch_reader->prefetch_queues_));
+
+    // Set IOError for reader[1] after the first NextBatch().
+    // Now the data in prefetch_queues_[0] is [30,39], prefetch_queues_[1] is 
[10,19],
+    // prefetch_queues_[2] is [20,29],
+    // So, the IOError will occur at [40,49].
+    dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[1].get())
+        ->SetNextBatchStatus(Status::IOError("mock error"));
+    usleep(100000);
+    // pop [10,19]
+    ASSERT_OK_AND_ASSIGN(batch_with_bitmap, reader->NextBatchWithBitmap());
+    // now reader1 fetch [40,49] and set error status.
+    usleep(100000);
+    ASSERT_NOK(reader->NextBatchWithBitmap());
+    ReaderUtils::ReleaseReadBatch(std::move(batch_with_bitmap.first));
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestAllReaderFailedWithIOError) {
+    auto data_array = PrepareArray(101);
+    int32_t batch_size = 10;
+    int32_t prefetch_max_parallel_num = 3;
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num * 2, 
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/true, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+
+    auto prefetch_reader = 
dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
+    for (int32_t i = 0; i < prefetch_max_parallel_num; i++) {
+        dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[i].get())
+            ->SetNextBatchStatus(Status::IOError("mock error"));
+    }
+
+    ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
+    auto batch_result = reader->NextBatchWithBitmap();
+    ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
+    ASSERT_FALSE(batch_result.ok());
+    ASSERT_TRUE(batch_result.status().IsIOError());
+    ASSERT_FALSE(prefetch_reader->is_shutdown_);
+    ASSERT_NOK(prefetch_reader->GetReadStatus());
+    ASSERT_FALSE(HasValue(prefetch_reader->prefetch_queues_));
+
+    // call NextBatch again, will still return error status
+    auto batch_result2 = reader->NextBatchWithBitmap();
+    ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
+    ASSERT_FALSE(batch_result2.ok());
+    ASSERT_TRUE(batch_result2.status().IsIOError());
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestPrefetchWithEmptyData) {
+    auto data_array = PrepareArray(0);
+    int32_t batch_size = 10;
+    int32_t prefetch_max_parallel_num = 3;
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num * 2, 
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/true, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+    ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
+    ASSERT_OK_AND_ASSIGN(auto result_array,
+                         ReadResultCollector::CollectResult(
+                             reader.get(), /*max simulated data processing 
time*/ 100));
+    ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 0);
+    ASSERT_FALSE(result_array);
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestCallNextBatchAfterReadingEof) {
+    auto data_array = PrepareArray(10);
+    int32_t batch_size = 10;
+    int32_t prefetch_max_parallel_num = 6;
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num * 2, 
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/true, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+    ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
+    ASSERT_OK_AND_ASSIGN(auto result_array,
+                         ReadResultCollector::CollectResult(
+                             reader.get(), /*max simulated data processing 
time*/ 100));
+    ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 10);
+    auto expected_array = std::make_shared<arrow::ChunkedArray>(data_array);
+    ASSERT_TRUE(result_array->Equals(expected_array));
+
+    // continue to call NextBatch() after reading eof
+    ASSERT_OK_AND_ASSIGN(auto batch_with_bitmap, 
reader->NextBatchWithBitmap());
+    ASSERT_TRUE(BatchReader::IsEofBatch(batch_with_bitmap));
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestCreateReaderWithoutNextBatch) {
+    auto data_array = PrepareArray(101);
+    int32_t batch_size = 10;
+    int32_t prefetch_max_parallel_num = 3;
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num * 2, 
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/true, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestInvalidCase) {
+    auto data_array = PrepareArray(101);
+    int32_t batch_size = 10;
+    int32_t prefetch_max_parallel_num = 3;
+    std::string data_file_path = "";
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+    {
+        ASSERT_NOK(PrefetchFileBatchReaderImpl::Create(
+            data_file_path, &reader_builder, mock_fs_,
+            /*prefetch_max_parallel_num=*/0, batch_size, 2,
+            /*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/true, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+    }
+    {
+        ASSERT_NOK(PrefetchFileBatchReaderImpl::Create(
+            data_file_path, &reader_builder, mock_fs_, 
prefetch_max_parallel_num, /*batch_size=*/-1,
+            prefetch_max_parallel_num * 2, 
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/true, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+    }
+    {
+        ASSERT_NOK(PrefetchFileBatchReaderImpl::Create(
+            data_file_path, &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+            prefetch_max_parallel_num * 2,
+            /*enable_adaptive_prefetch_strategy=*/false,
+            /*executor=*/nullptr, /*initialize_read_ranges=*/true,
+            /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS, CacheConfig(), 
GetDefaultPool()));
+    }
+    {
+        ASSERT_NOK(PrefetchFileBatchReaderImpl::Create(
+            data_file_path, /*reader_builder=*/nullptr, mock_fs_, 
prefetch_max_parallel_num,
+            batch_size, prefetch_max_parallel_num * 2,
+            /*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/true, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+    }
+    {
+        ASSERT_NOK(PrefetchFileBatchReaderImpl::Create(
+            data_file_path, &reader_builder,
+            /*fs=*/nullptr, prefetch_max_parallel_num, batch_size, 
prefetch_max_parallel_num * 2,
+            /*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/true, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+    }
+    {
+        ASSERT_OK_AND_ASSIGN(
+            auto reader,
+            PrefetchFileBatchReaderImpl::Create(
+                data_file_path, &reader_builder, mock_fs_, 
prefetch_max_parallel_num, batch_size,
+                prefetch_max_parallel_num * 2,
+                /*enable_adaptive_prefetch_strategy=*/false, executor_,
+                /*initialize_read_ranges=*/true, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+                CacheConfig(), GetDefaultPool()));
+        ASSERT_NOK_WITH_MSG(reader->SeekToRow(/*row_number=*/101),
+                            "not support seek to row for prefetch reader");
+    }
+}
+
+/// There are three stripes: [0,30), [30,60), [60,90). After predicate 
pushdown, the stripe
+/// [30,60) will be filtered out.
+/// The read range is [0,30), [30,60), [60,90). So, expected results is 
[0,30), [60,90)
+TEST_P(PrefetchFileBatchReaderImplTest, 
TestPrefetchWithPredicatePushdownWithCompleteFiltering) {
+    auto [file_format, cache_mode] = GetParam();
+    auto data_array = PrepareArray(90);
+    PrepareTestData(file_format, data_array, /*stripe_row_count=*/30, 
/*row_index_stride=*/30);
+    auto schema = arrow::schema(fields_);
+    ASSERT_OK_AND_ASSIGN(auto predicate,
+                         PredicateBuilder::Or({
+                             PredicateBuilder::LessThan(/*field_index=*/1, 
/*field_name=*/"f1",
+                                                        FieldType::BIGINT, 
Literal(20l)),
+                             PredicateBuilder::GreaterThan(/*field_index=*/1, 
/*field_name=*/"f1",
+                                                           FieldType::BIGINT, 
Literal(70l)),
+                         }));
+
+    auto reader =
+        PreparePrefetchReader(file_format, schema.get(), predicate,
+                              /*selection_bitmap=*/std::nullopt,
+                              /*batch_size=*/10, 
/*prefetch_max_parallel_num=*/3, cache_mode);
+    ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
+    ASSERT_OK_AND_ASSIGN(auto result_array,
+                         ReadResultCollector::CollectResult(
+                             reader.get(), /*max simulated data processing 
time*/ 100));
+    ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 90);
+
+    arrow::ArrayVector expected_array_vector;
+    expected_array_vector.push_back(data_array->Slice(0, 30));
+    expected_array_vector.push_back(data_array->Slice(60, 30));
+    auto expected_array = 
std::make_shared<arrow::ChunkedArray>(expected_array_vector);
+    ASSERT_TRUE(CheckEqual(expected_array, result_array));
+}
+
+/// There are three stripes: [0,30), [30,60), [60,90). Each stripe has 3 row 
groups.
+/// After predicate pushdown, the row group [0, 20), [70, 90) will be remained.
+/// The read range is [0,30), [30,60), [60,90).
+TEST_P(PrefetchFileBatchReaderImplTest,
+       TestPrefetchWithOrcPredicatePushdownWithRowGroupGranularity) {
+    auto [file_format, cache_mode] = GetParam();
+    auto data_array = PrepareArray(90);
+    PrepareTestData(file_format, data_array, /*stripe_row_count=*/30, 
/*row_index_stride=*/10);
+
+    auto schema = arrow::schema(fields_);
+    ASSERT_OK_AND_ASSIGN(auto predicate,
+                         PredicateBuilder::Or({
+                             PredicateBuilder::LessThan(/*field_index=*/1, 
/*field_name=*/"f1",
+                                                        FieldType::BIGINT, 
Literal(20l)),
+                             PredicateBuilder::GreaterThan(/*field_index=*/1, 
/*field_name=*/"f1",
+                                                           FieldType::BIGINT, 
Literal(70l)),
+                         }));
+
+    auto reader =
+        PreparePrefetchReader(file_format, schema.get(), predicate,
+                              /*selection_bitmap=*/std::nullopt,
+                              /*batch_size=*/10, 
/*prefetch_max_parallel_num=*/3, cache_mode);
+    ASSERT_OK(reader->RefreshReadRanges());
+    ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
+    ASSERT_OK_AND_ASSIGN(auto result_array,
+                         ReadResultCollector::CollectResult(
+                             reader.get(), /*max simulated data processing 
time*/ 100));
+    ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 90);
+
+    arrow::ArrayVector expected_array_vector;
+    expected_array_vector.push_back(data_array->Slice(0, 20));
+    expected_array_vector.push_back(data_array->Slice(70, 20));
+    auto expected_array = 
std::make_shared<arrow::ChunkedArray>(expected_array_vector);
+    ASSERT_TRUE(CheckEqual(expected_array, result_array));
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestPrefetchWithBitmap) {
+    auto data_array = PrepareArray(10000);
+    std::set<int32_t> valid_row_ids;
+    for (int32_t i = 0; i < 5120; i++) {
+        valid_row_ids.insert(paimon::test::RandomNumber(0, 
data_array->length() - 1));
+    }
+    std::vector<int32_t> bitmap_data(valid_row_ids.begin(), 
valid_row_ids.end());
+    auto bitmap = RoaringBitmap32::From(bitmap_data);
+    MockFormatReaderBuilder reader_builder(data_array, data_type_, bitmap,
+                                           /*read_batch_size=*/100);
+    int32_t prefetch_max_parallel_num = 3;
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        PrefetchFileBatchReaderImpl::Create(
+            /*data_file_path=*/"", &reader_builder, mock_fs_, 
prefetch_max_parallel_num,
+            /*batch_size=*/100, prefetch_max_parallel_num * 2,
+            /*enable_adaptive_prefetch_strategy=*/false, executor_,
+            /*initialize_read_ranges=*/true, 
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+            CacheConfig(), GetDefaultPool()));
+    ASSERT_OK_AND_ASSIGN(auto result_chunk_array, 
ReadResultCollector::CollectResult(reader.get()));
+
+    ASSERT_OK_AND_ASSIGN(auto data_batch, 
ReadResultCollector::GetReadBatch(data_array));
+    ASSERT_OK_AND_ASSIGN(auto expected_batch, 
ReaderUtils::ApplyBitmapToReadBatch(
+                                                  
std::make_pair(std::move(data_batch), bitmap),
+                                                  
arrow::default_memory_pool()));
+    ASSERT_OK_AND_ASSIGN(auto expected_array,
+                         
ReadResultCollector::GetArray(std::move(expected_batch)));
+    auto expected_chunk_array = 
std::make_shared<arrow::ChunkedArray>(expected_array);
+    ASSERT_TRUE(result_chunk_array->Equals(expected_chunk_array));
+}
+
+}  // namespace paimon::test

Reply via email to