This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new d5755eb feat: add prefetch file batch reader implementation (#70)
d5755eb is described below
commit d5755eb047f89e3e9affcc38c838980a8bdcd71e
Author: Yonghao Fang <[email protected]>
AuthorDate: Wed Jun 10 11:00:10 2026 +0800
feat: add prefetch file batch reader implementation (#70)
---
.../reader/prefetch_file_batch_reader_impl.cpp | 617 ++++++++++++++
.../reader/prefetch_file_batch_reader_impl.h | 172 ++++
.../prefetch_file_batch_reader_impl_test.cpp | 914 +++++++++++++++++++++
3 files changed, 1703 insertions(+)
diff --git a/src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp
b/src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp
new file mode 100644
index 0000000..95a11d3
--- /dev/null
+++ b/src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "paimon/common/reader/prefetch_file_batch_reader_impl.h"
+
+#include <algorithm>
+#include <chrono>
+#include <future>
+#include <thread>
+
+#include "arrow/array/array_base.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "paimon/common/executor/future.h"
+#include "paimon/common/io/cache_input_stream.h"
+#include "paimon/common/metrics/metrics_impl.h"
+#include "paimon/common/reader/reader_utils.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/scope_guard.h"
+#include "paimon/format/reader_builder.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/utils/read_ahead_cache.h"
+
+namespace arrow {
+class Schema;
+} // namespace arrow
+
+namespace paimon {
+
+Result<std::unique_ptr<PrefetchFileBatchReaderImpl>>
PrefetchFileBatchReaderImpl::Create(
+ const std::string& data_file_path, const ReaderBuilder* reader_builder,
+ const std::shared_ptr<FileSystem>& fs, uint32_t prefetch_max_parallel_num,
int32_t batch_size,
+ uint32_t prefetch_batch_count, bool enable_adaptive_prefetch_strategy,
+ const std::shared_ptr<Executor>& executor, bool initialize_read_ranges,
+ PrefetchCacheMode prefetch_cache_mode, const CacheConfig& cache_config,
+ const std::shared_ptr<MemoryPool>& pool) {
+ if (prefetch_max_parallel_num == 0) {
+ return Status::Invalid("prefetch max parallel num should be greater
than 0.");
+ }
+ if (prefetch_batch_count == 0) {
+ return Status::Invalid("prefetch batch count should be greater than
0.");
+ }
+ if (batch_size <= 0) {
+ return Status::Invalid("batch size should be greater than 0.");
+ }
+ if (reader_builder == nullptr) {
+ return Status::Invalid("reader_builder should not be nullptr.");
+ }
+ if (fs == nullptr) {
+ return Status::Invalid("file system should not be nullptr.");
+ }
+ if (executor == nullptr) {
+ return Status::Invalid("executor should not be nullptr.");
+ }
+
+ std::shared_ptr<ReadAheadCache> cache;
+ if (prefetch_cache_mode != PrefetchCacheMode::NEVER) {
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<InputStream> input_stream,
fs->Open(data_file_path));
+ cache = std::make_shared<ReadAheadCache>(input_stream, cache_config,
pool);
+ }
+ std::vector<std::future<Result<std::unique_ptr<FileBatchReader>>>> futures;
+ for (uint32_t i = 0; i < prefetch_max_parallel_num; i++) {
+ futures.push_back(Via(executor.get(),
+ [&fs, &data_file_path, &reader_builder,
+ &cache]() ->
Result<std::unique_ptr<FileBatchReader>> {
+
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<InputStream> input_stream,
+
fs->Open(data_file_path));
+ auto cache_input_stream =
std::make_shared<CacheInputStream>(
+ std::move(input_stream), cache);
+ return
reader_builder->Build(cache_input_stream);
+ }));
+ }
+ std::vector<std::shared_ptr<PrefetchFileBatchReader>> readers;
+ for (auto& file_batch_reader : CollectAll(futures)) {
+ if (!file_batch_reader.ok()) {
+ return file_batch_reader.status();
+ }
+ std::shared_ptr<FileBatchReader> reader =
std::move(file_batch_reader).value();
+ auto prefetch_file_batch_reader =
+ std::dynamic_pointer_cast<PrefetchFileBatchReader>(reader);
+ if (prefetch_file_batch_reader == nullptr) {
+ return Status::Invalid(
+ "failed to cast to prefetch file batch reader. file format not
support prefetch");
+ }
+ readers.emplace_back(prefetch_file_batch_reader);
+ }
+ if (prefetch_batch_count < readers.size()) {
+ prefetch_batch_count = readers.size();
+ }
+ uint32_t prefetch_queue_capacity = prefetch_batch_count / readers.size();
+
+ auto reader = std::unique_ptr<PrefetchFileBatchReaderImpl>(new
PrefetchFileBatchReaderImpl(
+ readers, batch_size, prefetch_queue_capacity,
enable_adaptive_prefetch_strategy, executor,
+ cache, prefetch_cache_mode));
+ if (initialize_read_ranges) {
+ // normally initialize read ranges should be false, as set read schema
will refresh read
+ // ranges, and set read schema will always be called before read.
+ PAIMON_RETURN_NOT_OK(reader->RefreshReadRanges());
+ }
+ return reader;
+}
+
+PrefetchFileBatchReaderImpl::PrefetchFileBatchReaderImpl(
+ const std::vector<std::shared_ptr<PrefetchFileBatchReader>>& readers,
int32_t batch_size,
+ uint32_t prefetch_queue_capacity, bool enable_adaptive_prefetch_strategy,
+ const std::shared_ptr<Executor>& executor, const
std::shared_ptr<ReadAheadCache>& cache,
+ PrefetchCacheMode cache_mode)
+ : readers_(std::move(readers)),
+ batch_size_(batch_size),
+ executor_(executor),
+ cache_(cache),
+ cache_mode_(cache_mode),
+ prefetch_queue_capacity_(prefetch_queue_capacity),
+ enable_adaptive_prefetch_strategy_(enable_adaptive_prefetch_strategy) {
+ for (size_t i = 0; i < readers_.size(); i++) {
+
prefetch_queues_.emplace_back(std::make_unique<ThreadsafeQueue<PrefetchBatch>>());
+ readers_pos_.emplace_back(std::make_unique<std::atomic<uint64_t>>(0));
+ reader_is_working_.emplace_back(false);
+ }
+ parallel_num_ = readers_.size();
+}
+
+PrefetchFileBatchReaderImpl::~PrefetchFileBatchReaderImpl() {
+ (void)CleanUp();
+}
+
+Status PrefetchFileBatchReaderImpl::SetReadSchema(
+ ::ArrowSchema* read_schema, const std::shared_ptr<Predicate>& predicate,
+ const std::optional<RoaringBitmap32>& selection_bitmap) {
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Schema> schema,
+ arrow::ImportSchema(read_schema));
+ for (const auto& reader : readers_) {
+ auto c_schema = std::make_unique<::ArrowSchema>();
+ PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schema,
c_schema.get()));
+ PAIMON_RETURN_NOT_OK(reader->SetReadSchema(c_schema.get(), predicate,
selection_bitmap));
+ }
+ selection_bitmap_ = selection_bitmap;
+ predicate_ = predicate;
+ return RefreshReadRanges();
+}
+
+Status PrefetchFileBatchReaderImpl::RefreshReadRanges() {
+ PAIMON_RETURN_NOT_OK(CleanUp());
+ bool need_prefetch;
+ PAIMON_ASSIGN_OR_RAISE(auto read_ranges,
readers_[0]->GenReadRanges(&need_prefetch));
+
+ if (!enable_adaptive_prefetch_strategy_) {
+ need_prefetch = true;
+ } else if (need_prefetch && enable_adaptive_prefetch_strategy_ &&
!read_ranges.empty()) {
+ uint64_t batch_count_in_range =
+ (read_ranges[0].second - read_ranges[0].first) / batch_size_;
+ if (batch_count_in_range >
static_cast<uint64_t>(prefetch_queue_capacity_)) {
+ need_prefetch = false;
+ }
+ }
+
+ need_prefetch_ = need_prefetch;
+ PAIMON_RETURN_NOT_OK(SetReadRanges(FilterReadRanges(read_ranges,
selection_bitmap_)));
+ read_ranges_freshed_ = true;
+
+ return Status::OK();
+}
+
+std::vector<std::pair<uint64_t, uint64_t>>
PrefetchFileBatchReaderImpl::FilterReadRanges(
+ const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges,
+ const std::optional<RoaringBitmap32>& selection_bitmap) {
+ if (!selection_bitmap) {
+ return read_ranges;
+ }
+ std::vector<std::pair<uint64_t, uint64_t>> result;
+ for (const auto& read_range : read_ranges) {
+ if (selection_bitmap.value().ContainsAny(read_range.first,
read_range.second)) {
+ result.push_back(read_range);
+ }
+ }
+ return result;
+}
+
+Status PrefetchFileBatchReaderImpl::SetReadRanges(
+ const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges) {
+ // push down read ranges for reducing IO amplification
+ read_ranges_in_group_ = DispatchReadRanges(read_ranges, readers_.size());
+ if (need_prefetch_ && readers_.size() > 1) {
+ // if prefetching isn't necessary, then setting read ranges won't be
needed either.
+ std::vector<std::future<Status>> futures;
+ for (size_t i = 0; i < readers_.size(); i++) {
+ futures.push_back(Via(executor_.get(), [this, i]() -> Status {
+ return readers_[i]->SetReadRanges(read_ranges_in_group_[i]);
+ }));
+ }
+ for (const auto& status : CollectAll(futures)) {
+ if (!status.ok()) {
+ return status;
+ }
+ }
+ }
+ for (const auto& read_range : read_ranges) {
+ read_ranges_.push_back(read_range);
+ }
+ // Note: add a special read range out of file row count, for trigger an
EOF access.
+ std::pair<uint64_t, uint64_t> eof_range;
+ PAIMON_ASSIGN_OR_RAISE(eof_range, EofRange());
+ read_ranges_.push_back(eof_range);
+ for (auto& read_ranges : read_ranges_in_group_) {
+ read_ranges.push_back(eof_range);
+ }
+ return Status::OK();
+}
+
+std::vector<std::vector<std::pair<uint64_t, uint64_t>>>
+PrefetchFileBatchReaderImpl::DispatchReadRanges(
+ const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges, size_t
group_count) {
+ std::vector<std::vector<std::pair<uint64_t, uint64_t>>>
read_ranges_in_group;
+ read_ranges_in_group.resize(group_count);
+ for (size_t i = 0; i < read_ranges.size(); i++) {
+ read_ranges_in_group[i % group_count].push_back(read_ranges[i]);
+ }
+ return read_ranges_in_group;
+}
+
+Status PrefetchFileBatchReaderImpl::CleanUp() {
+ auto clean_prefetch_queue = [this]() {
+ for (auto& prefetch_queue : prefetch_queues_) {
+ while (true) {
+ std::optional<PrefetchBatch> batch = prefetch_queue->try_pop();
+ {
+ std::unique_lock<std::mutex> lock(working_mutex_);
+ cv_.notify_one();
+ }
+ if (batch == std::nullopt) {
+ break;
+ }
+
ReaderUtils::ReleaseReadBatch(std::move(batch.value().batch.first));
+ }
+ }
+ };
+ // Clear the existing read ranges and prefetch queue
+ {
+ std::unique_lock<std::mutex> lock(working_mutex_);
+ is_shutdown_ = true; // set is shutdown and check shutdown to avoid
block at queue.push
+ cv_.notify_one();
+ }
+ // Join and reset the background thread if it exists
+ if (background_thread_) {
+ if (background_thread_->joinable()) {
+ background_thread_->join();
+ background_thread_.reset();
+ } else {
+ return Status::Invalid("background thread is not joinable");
+ }
+ }
+
+ read_ranges_.clear();
+ read_ranges_in_group_.clear();
+ clean_prefetch_queue();
+ for (size_t i = 0; i < readers_pos_.size(); i++) {
+ readers_pos_[i]->store(0);
+ reader_is_working_[i] = false;
+ }
+ is_shutdown_ = false;
+ if (cache_) {
+ cache_->Reset();
+ }
+ SetReadStatus(Status::OK());
+ return Status::OK();
+}
+
+bool PrefetchFileBatchReaderImpl::NeedInitCache() const {
+ switch (cache_mode_) {
+ case PrefetchCacheMode::NEVER:
+ return false;
+ case PrefetchCacheMode::EXCLUDE_PREDICATE:
+ return predicate_ == nullptr;
+ case PrefetchCacheMode::EXCLUDE_BITMAP:
+ return selection_bitmap_ == std::nullopt;
+ case PrefetchCacheMode::EXCLUDE_BITMAP_OR_PREDICATE:
+ return predicate_ == nullptr && selection_bitmap_ == std::nullopt;
+ case PrefetchCacheMode::ALWAYS:
+ return true;
+ default:
+ assert(false);
+ return true;
+ }
+}
+
+void PrefetchFileBatchReaderImpl::Workloop() {
+ std::vector<std::future<void>> futures;
+ futures.resize(readers_.size());
+ if (cache_ && NeedInitCache()) {
+ auto read_ranges = readers_[0]->PreBufferRange();
+ if (read_ranges.ok()) {
+ std::vector<ByteRange> ranges;
+ for (const auto& read_range : read_ranges.value()) {
+ ranges.emplace_back(read_range.first, read_range.second);
+ }
+ auto s = cache_->Init(std::move(ranges));
+ if (!s.ok()) {
+ SetReadStatus(s);
+ }
+ } else {
+ SetReadStatus(read_ranges.status());
+ }
+ }
+
+ while (true) {
+ if (!GetReadStatus().ok()) {
+ break;
+ }
+ if (is_shutdown_) {
+ break;
+ }
+ bool all_finished = true;
+ for (const auto& reader_pos : readers_pos_) {
+ if (reader_pos->load() != std::numeric_limits<uint64_t>::max()) {
+ all_finished = false;
+ }
+ }
+ if (all_finished) {
+ break;
+ }
+
+ bool made_progress_this_iteration = false;
+ for (size_t reader_idx = 0; reader_idx < readers_.size();
reader_idx++) {
+ if (!futures[reader_idx].valid() ||
+ (futures[reader_idx].wait_for(std::chrono::microseconds(0)) ==
+ std::future_status::ready)) {
+ if (futures[reader_idx].valid()) {
+ futures[reader_idx].get();
+ }
+ if (prefetch_queues_[reader_idx]->size() >=
prefetch_queue_capacity_) {
+ // queue is full, skip
+ continue;
+ }
+ if (readers_pos_[reader_idx]->load() !=
std::numeric_limits<uint64_t>::max()) {
+ futures[reader_idx] =
+ Via(executor_.get(), [this, reader_idx]() {
ReadBatch(reader_idx); });
+ made_progress_this_iteration = true;
+ }
+ }
+ }
+ if (!made_progress_this_iteration) {
+ std::unique_lock<std::mutex> lock(working_mutex_);
+ cv_.wait(lock, [this] {
+ if (is_shutdown_) {
+ return true;
+ }
+ for (size_t i = 0; i < reader_is_working_.size(); i++) {
+ if (reader_is_working_[i]) {
+ continue;
+ }
+ if (prefetch_queues_[i]->size() >=
prefetch_queue_capacity_) {
+ continue;
+ }
+ if (readers_pos_[i]->load() ==
std::numeric_limits<uint64_t>::max()) {
+ continue;
+ }
+ return true;
+ }
+ return false;
+ });
+ }
+ }
+ Wait(futures);
+}
+
+void PrefetchFileBatchReaderImpl::ReadBatch(size_t reader_idx) {
+ Status status = DoReadBatch(reader_idx);
+ if (!status.ok()) {
+ SetReadStatus(status);
+ }
+}
+
+std::optional<std::pair<uint64_t, uint64_t>>
PrefetchFileBatchReaderImpl::GetCurrentReadRange(
+ size_t reader_idx) const {
+ const auto& read_ranges = read_ranges_in_group_[reader_idx];
+ const auto& current_pos = readers_pos_[reader_idx];
+ uint64_t current_pos_value = current_pos->load();
+
+ for (const auto& range : read_ranges) {
+ if (current_pos_value < range.second) {
+ return range;
+ }
+ }
+ return std::nullopt;
+}
+
+Status PrefetchFileBatchReaderImpl::EnsureReaderPosition(
+ size_t reader_idx, const std::pair<uint64_t, uint64_t>&
current_read_range) const {
+ uint64_t pos = std::max(readers_pos_[reader_idx]->load(),
current_read_range.first);
+ if (readers_[reader_idx]->GetNextRowToRead() != pos) {
+ return readers_[reader_idx]->SeekToRow(pos);
+ }
+ return Status::OK();
+}
+
+Status PrefetchFileBatchReaderImpl::HandleReadResult(
+ size_t reader_idx, const std::pair<uint64_t, uint64_t>& read_range,
+ ReadBatchWithBitmap&& read_batch_with_bitmap) {
+ PAIMON_ASSIGN_OR_RAISE(uint64_t first_row_number,
+
readers_[reader_idx]->GetPreviousBatchFirstRowNumber());
+ auto& prefetch_queue = prefetch_queues_[reader_idx];
+ if (!BatchReader::IsEofBatch(read_batch_with_bitmap)) {
+ auto& [read_batch, bitmap] = read_batch_with_bitmap;
+ auto& [c_array, c_schema] = read_batch;
+
+ if (first_row_number >= read_range.second) {
+ // fully out of range, data before first_row_number has been
filtered out
+ readers_pos_[reader_idx]->store(first_row_number);
+ ReaderUtils::ReleaseReadBatch(std::move(read_batch));
+ return Status::OK();
+ } else if (first_row_number + c_array->length > read_range.second) {
+ // partially out of range, data before read_range.second has been
effectively consumed
+ readers_pos_[reader_idx]->store(read_range.second);
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array>
src_array,
+
arrow::ImportArray(c_array.get(), c_schema.get()));
+ int32_t target_length = read_range.second - first_row_number;
+ auto array = src_array->Slice(/*offset=*/0, target_length);
+ PAIMON_RETURN_NOT_OK_FROM_ARROW(
+ arrow::ExportArray(*array, c_array.get(), c_schema.get()));
+ bitmap.RemoveRange(target_length, src_array->length());
+ } else {
+ // all within the range, data before
readers_[reader_idx]->GetNextRowToRead() has been
+ // effectively consumed
+
readers_pos_[reader_idx]->store(readers_[reader_idx]->GetNextRowToRead());
+ }
+ if (bitmap.IsEmpty()) {
+ ReaderUtils::ReleaseReadBatch(std::move(read_batch));
+ return Status::OK();
+ }
+ prefetch_queue->push({read_range, std::move(read_batch_with_bitmap),
first_row_number});
+ } else {
+ std::pair<uint64_t, uint64_t> eof_range;
+ PAIMON_ASSIGN_OR_RAISE(eof_range, EofRange());
+ prefetch_queue->push({eof_range, std::move(read_batch_with_bitmap),
first_row_number});
+ readers_pos_[reader_idx]->store(std::numeric_limits<uint64_t>::max());
+ }
+ return Status::OK();
+}
+
+Status PrefetchFileBatchReaderImpl::DoReadBatch(size_t reader_idx) {
+ PAIMON_RETURN_NOT_OK(GetReadStatus());
+ if (is_shutdown_) {
+ return Status::OK();
+ }
+ std::optional<std::pair<uint64_t, uint64_t>> current_read_range =
+ GetCurrentReadRange(reader_idx);
+ if (current_read_range == std::nullopt) {
+ // No more read ranges for this reader, gracefully exit.
+ return Status::OK();
+ }
+ ScopeGuard guard([&]() {
+ std::unique_lock<std::mutex> lock(working_mutex_);
+ reader_is_working_[reader_idx] = false;
+ cv_.notify_one();
+ });
+ {
+ std::unique_lock<std::mutex> lock(working_mutex_);
+ reader_is_working_[reader_idx] = true;
+ }
+
+ const auto& read_range = current_read_range.value();
+ FileBatchReader* reader = readers_[reader_idx].get();
+ PAIMON_RETURN_NOT_OK(EnsureReaderPosition(reader_idx, read_range));
+
+ PAIMON_ASSIGN_OR_RAISE(ReadBatchWithBitmap read_batch_with_bitmap,
+ reader->NextBatchWithBitmap());
+
+ return HandleReadResult(reader_idx, read_range,
std::move(read_batch_with_bitmap));
+}
+
+Result<BatchReader::ReadBatchWithBitmap>
PrefetchFileBatchReaderImpl::NextBatchWithBitmap() {
+ if (!read_ranges_freshed_) {
+ return Status::Invalid("prefetch reader read ranges are not
initialized");
+ }
+ if (!background_thread_) {
+ background_thread_ =
+
std::make_unique<std::thread>(&PrefetchFileBatchReaderImpl::Workloop, this);
+ }
+
+ while (true) {
+ PAIMON_RETURN_NOT_OK(GetReadStatus());
+ if (is_shutdown_) {
+ return Status::Invalid(
+ "prefetch reader has inconsistent state, maybe read while
closing reader or change "
+ "read schema");
+ }
+ std::optional<std::pair<uint64_t, uint64_t>> min_range;
+ size_t eof_count = 0;
+ size_t value_count = 0;
+ for (auto& prefetch_queue : prefetch_queues_) {
+ PAIMON_RETURN_NOT_OK(GetReadStatus());
+ const PrefetchBatch* peek_batch = prefetch_queue->try_front();
+ if (!peek_batch) {
+ continue;
+ }
+ if (min_range == std::nullopt) {
+ min_range = peek_batch->read_range;
+ } else {
+ if (peek_batch->read_range.first < min_range.value().first) {
+ min_range = peek_batch->read_range;
+ }
+ }
+ value_count++;
+ PAIMON_ASSIGN_OR_RAISE(bool is_eof_range,
IsEofRange(peek_batch->read_range));
+ if (is_eof_range) {
+ eof_count++;
+ continue;
+ }
+
+ const auto& current_read_range = read_ranges_.front();
+ if (peek_batch->read_range == current_read_range) {
+ auto prefetch_batch = prefetch_queue->try_pop();
+ {
+ std::unique_lock<std::mutex> lock(working_mutex_);
+ cv_.notify_one();
+ }
+ previous_batch_first_row_num_ =
prefetch_batch.value().previous_batch_first_row_num;
+ return std::move(prefetch_batch).value().batch;
+ }
+ }
+ if (eof_count == prefetch_queues_.size()) {
+ const PrefetchBatch* peek_batch = prefetch_queues_[0]->try_front();
+ if (peek_batch == nullptr) {
+ assert(false);
+ return Status::Invalid("peek batch not suppose to be nullptr");
+ }
+ previous_batch_first_row_num_ =
peek_batch->previous_batch_first_row_num;
+ return BatchReader::MakeEofBatchWithBitmap();
+ }
+ if (value_count == prefetch_queues_.size()) {
+ while (true) {
+ if (read_ranges_.empty()) {
+ break;
+ }
+ const auto& current_read_range = read_ranges_.front();
+ if (current_read_range.first < min_range.value().first) {
+ read_ranges_.pop_front();
+ } else {
+ break;
+ }
+ }
+ } else {
+ std::this_thread::sleep_for(std::chrono::microseconds(1));
+ }
+ }
+}
+
+Status PrefetchFileBatchReaderImpl::SeekToRow(uint64_t row_number) {
+ return Status::NotImplemented("not support seek to row for prefetch
reader");
+}
+
+std::shared_ptr<Metrics> PrefetchFileBatchReaderImpl::GetReaderMetrics() const
{
+ return MetricsImpl::CollectReadMetrics(readers_);
+}
+
+Result<std::unique_ptr<::ArrowSchema>>
PrefetchFileBatchReaderImpl::GetFileSchema() const {
+ assert(!readers_.empty());
+ return readers_[0]->GetFileSchema();
+}
+
+Result<uint64_t> PrefetchFileBatchReaderImpl::GetPreviousBatchFirstRowNumber()
const {
+ return previous_batch_first_row_num_;
+}
+
+Result<uint64_t> PrefetchFileBatchReaderImpl::GetNumberOfRows() const {
+ assert(!readers_.empty());
+ return readers_[0]->GetNumberOfRows();
+}
+
+uint64_t PrefetchFileBatchReaderImpl::GetNextRowToRead() const {
+ assert(false);
+ return -1;
+}
+
+void PrefetchFileBatchReaderImpl::SetReadStatus(const Status& status) {
+ std::unique_lock<std::shared_mutex> lock(rw_mutex_);
+ read_status_ = status;
+}
+
+Status PrefetchFileBatchReaderImpl::GetReadStatus() const {
+ std::shared_lock<std::shared_mutex> lock(rw_mutex_);
+ return read_status_;
+}
+Result<bool> PrefetchFileBatchReaderImpl::IsEofRange(
+ const std::pair<uint64_t, uint64_t>& read_range) const {
+ PAIMON_ASSIGN_OR_RAISE(uint64_t num_rows, GetNumberOfRows());
+ return read_range.first >= num_rows;
+}
+
+Result<std::pair<uint64_t, uint64_t>> PrefetchFileBatchReaderImpl::EofRange()
const {
+ PAIMON_ASSIGN_OR_RAISE(uint64_t num_rows, GetNumberOfRows());
+ return std::make_pair(num_rows, num_rows + 1);
+}
+
+void PrefetchFileBatchReaderImpl::Close() {
+ (void)CleanUp();
+ for (const auto& reader : readers_) {
+ reader->Close();
+ }
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/reader/prefetch_file_batch_reader_impl.h
b/src/paimon/common/reader/prefetch_file_batch_reader_impl.h
new file mode 100644
index 0000000..f2916da
--- /dev/null
+++ b/src/paimon/common/reader/prefetch_file_batch_reader_impl.h
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <atomic>
+#include <cassert>
+#include <condition_variable>
+#include <cstddef>
+#include <cstdint>
+#include <deque>
+#include <limits>
+#include <memory>
+#include <mutex>
+#include <optional>
+#include <shared_mutex>
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include "arrow/c/abi.h"
+#include "paimon/common/utils/threadsafe_queue.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/reader/prefetch_file_batch_reader.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/utils/read_ahead_cache.h"
+#include "paimon/utils/roaring_bitmap32.h"
+
+struct ArrowSchema;
+
+namespace paimon {
+
+class ReaderBuilder;
+class FileSystem;
+class Executor;
+class Predicate;
+class Metrics;
+
+class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader {
+ public:
+ static Result<std::unique_ptr<PrefetchFileBatchReaderImpl>> Create(
+ const std::string& data_file_path, const ReaderBuilder* reader_builder,
+ const std::shared_ptr<FileSystem>& fs, uint32_t
prefetch_max_parallel_num,
+ int32_t batch_size, uint32_t prefetch_batch_count, bool
enable_adaptive_prefetch_strategy,
+ const std::shared_ptr<Executor>& executor, bool initialize_read_ranges,
+ PrefetchCacheMode prefetch_cache_mode, const CacheConfig& cache_config,
+ const std::shared_ptr<MemoryPool>& pool);
+
+ ~PrefetchFileBatchReaderImpl() override;
+
+ Result<FileBatchReader::ReadBatch> NextBatch() override {
+ return Status::Invalid(
+ "paimon inner reader PrefetchFileBatchReader should use
NextBatchWithBitmap");
+ }
+ Result<FileBatchReader::ReadBatchWithBitmap> NextBatchWithBitmap()
override;
+
+ std::shared_ptr<Metrics> GetReaderMetrics() const override;
+
+ Result<std::unique_ptr<::ArrowSchema>> GetFileSchema() const override;
+ Status SetReadSchema(::ArrowSchema* read_schema, const
std::shared_ptr<Predicate>& predicate,
+ const std::optional<RoaringBitmap32>&
selection_bitmap) override;
+
+ Status SeekToRow(uint64_t row_number) override;
+ Result<uint64_t> GetPreviousBatchFirstRowNumber() const override;
+ Result<uint64_t> GetNumberOfRows() const override;
+ uint64_t GetNextRowToRead() const override;
+ void Close() override;
+ Status SetReadRanges(const std::vector<std::pair<uint64_t, uint64_t>>&
read_ranges) override;
+
+ Result<std::vector<std::pair<uint64_t, uint64_t>>> GenReadRanges(
+ bool* need_prefetch) const override {
+ assert(false);
+ return Status::NotImplemented("gen read ranges not implemented");
+ }
+ bool SupportPreciseBitmapSelection() const override {
+ return readers_[0]->SupportPreciseBitmapSelection();
+ }
+
+ Status RefreshReadRanges();
+
+ inline PrefetchFileBatchReader* GetFirstReader() const {
+ return readers_[0].get();
+ }
+
+ inline bool NeedPrefetch() const {
+ return need_prefetch_;
+ }
+
+ private:
+ struct PrefetchBatch {
+ std::pair<uint64_t, uint64_t> read_range;
+ BatchReader::ReadBatchWithBitmap batch;
+ uint64_t previous_batch_first_row_num;
+ };
+
+ PrefetchFileBatchReaderImpl(
+ const std::vector<std::shared_ptr<PrefetchFileBatchReader>>& readers,
int32_t batch_size,
+ uint32_t prefetch_queue_capacity, bool
enable_adaptive_prefetch_strategy,
+ const std::shared_ptr<Executor>& executor, const
std::shared_ptr<ReadAheadCache>& cache,
+ PrefetchCacheMode cache_mode);
+
+ Status CleanUp();
+ void Workloop();
+ void SetReadStatus(const Status& status);
+ Status GetReadStatus() const;
+ Result<bool> IsEofRange(const std::pair<uint64_t, uint64_t>& read_range)
const;
+ Status DoReadBatch(size_t reader_idx);
+ void ReadBatch(size_t reader_idx);
+ size_t GetEnabledReaderSize() const;
+ static std::vector<std::pair<uint64_t, uint64_t>> FilterReadRanges(
+ const std::vector<std::pair<uint64_t, uint64_t>>& read_range,
+ const std::optional<RoaringBitmap32>& selection_bitmap);
+
+ static std::vector<std::vector<std::pair<uint64_t, uint64_t>>>
DispatchReadRanges(
+ const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges, size_t
reader_count);
+
+ Result<std::pair<uint64_t, uint64_t>> EofRange() const;
+ std::optional<std::pair<uint64_t, uint64_t>> GetCurrentReadRange(size_t
reader_idx) const;
+ Status EnsureReaderPosition(size_t reader_idx,
+ const std::pair<uint64_t, uint64_t>&
read_range) const;
+ Status HandleReadResult(size_t reader_idx, const std::pair<uint64_t,
uint64_t>& read_range,
+ FileBatchReader::ReadBatchWithBitmap&&
read_batch_with_bitmap);
+ bool NeedInitCache() const;
+
+ private:
+ std::vector<std::shared_ptr<PrefetchFileBatchReader>> readers_;
+ // The meaning of readers_pos_ is: all data before this pos has been
filtered out or effectively
+ // consumed, and the data after this pos may need to be read in the next
round of reading.
+ std::vector<std::unique_ptr<std::atomic<uint64_t>>> readers_pos_;
+ std::vector<std::unique_ptr<std::atomic<uint64_t>>> seek_cnt_;
+ const int32_t batch_size_;
+ std::optional<RoaringBitmap32> selection_bitmap_;
+ std::shared_ptr<Predicate> predicate_;
+ std::deque<std::pair<uint64_t, uint64_t>> read_ranges_;
+ std::vector<std::vector<std::pair<uint64_t, uint64_t>>>
read_ranges_in_group_;
+ std::vector<std::unique_ptr<ThreadsafeQueue<PrefetchBatch>>>
prefetch_queues_;
+ std::vector<bool> reader_is_working_;
+ std::mutex working_mutex_;
+ std::condition_variable cv_;
+ std::shared_ptr<Executor> executor_;
+ std::shared_ptr<ReadAheadCache> cache_;
+ PrefetchCacheMode cache_mode_;
+
+ mutable std::shared_mutex rw_mutex_;
+ std::unique_ptr<std::thread> background_thread_;
+ Status read_status_;
+ std::atomic<bool> is_shutdown_ = false;
+ uint64_t previous_batch_first_row_num_ =
std::numeric_limits<uint64_t>::max();
+ bool need_prefetch_ = false;
+ bool read_ranges_freshed_ = false;
+ const uint32_t prefetch_queue_capacity_;
+ const bool enable_adaptive_prefetch_strategy_;
+ int32_t parallel_num_;
+};
+} // namespace paimon
diff --git a/src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp
b/src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp
new file mode 100644
index 0000000..e9832c1
--- /dev/null
+++ b/src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp
@@ -0,0 +1,914 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "paimon/common/reader/prefetch_file_batch_reader_impl.h"
+
+#include <set>
+
+#include "arrow/compute/api.h"
+#include "arrow/ipc/api.h"
+#include "gtest/gtest.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/executor.h"
+#include "paimon/format/file_format.h"
+#include "paimon/format/file_format_factory.h"
+#include "paimon/format/format_writer.h"
+#include "paimon/fs/file_system_factory.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/predicate/predicate_builder.h"
+#include "paimon/testing/mock/mock_file_batch_reader.h"
+#include "paimon/testing/mock/mock_file_system.h"
+#include "paimon/testing/mock/mock_format_reader_builder.h"
+#include "paimon/testing/utils/read_result_collector.h"
+#include "paimon/testing/utils/testharness.h"
+#include "paimon/utils/read_ahead_cache.h"
+
+namespace paimon::test {
+
+class ControlledMockFileBatchReader : public MockFileBatchReader {
+ public:
+ ControlledMockFileBatchReader(const std::shared_ptr<arrow::Array>& data,
+ const std::shared_ptr<arrow::DataType>&
file_schema,
+ int32_t read_batch_size,
+ std::vector<std::pair<uint64_t, uint64_t>>
read_ranges,
+ bool need_prefetch, Status
set_read_ranges_status = Status::OK())
+ : MockFileBatchReader(data, file_schema, read_batch_size),
+ read_ranges_(std::move(read_ranges)),
+ need_prefetch_(need_prefetch),
+ set_read_ranges_status_(std::move(set_read_ranges_status)) {}
+
+ Result<std::vector<std::pair<uint64_t, uint64_t>>> GenReadRanges(
+ bool* need_prefetch) const override {
+ *need_prefetch = need_prefetch_;
+ return read_ranges_;
+ }
+
+ Status SetReadRanges(const std::vector<std::pair<uint64_t, uint64_t>>&
read_ranges) override {
+ if (!set_read_ranges_status_.ok()) {
+ return set_read_ranges_status_;
+ }
+ return MockFileBatchReader::SetReadRanges(read_ranges);
+ }
+
+ private:
+ std::vector<std::pair<uint64_t, uint64_t>> read_ranges_;
+ bool need_prefetch_ = true;
+ Status set_read_ranges_status_;
+};
+
+class ControlledMockFormatReaderBuilder : public ReaderBuilder {
+ public:
+ ControlledMockFormatReaderBuilder(const std::shared_ptr<arrow::Array>&
data,
+ const std::shared_ptr<arrow::DataType>&
schema,
+ int32_t read_batch_size,
+ std::vector<std::pair<uint64_t,
uint64_t>> read_ranges,
+ bool need_prefetch,
+ std::vector<Status>
set_read_ranges_statuses)
+ : data_(data),
+ schema_(schema),
+ read_batch_size_(read_batch_size),
+ read_ranges_(std::move(read_ranges)),
+ need_prefetch_(need_prefetch),
+ set_read_ranges_statuses_(std::move(set_read_ranges_statuses)) {}
+
+ ReaderBuilder* WithMemoryPool(const std::shared_ptr<MemoryPool>& pool)
override {
+ return this;
+ }
+
+ Result<std::unique_ptr<FileBatchReader>> Build(
+ const std::shared_ptr<InputStream>& path) const override {
+ size_t index = build_count_++;
+ Status set_read_ranges_status = index <
set_read_ranges_statuses_.size()
+ ? set_read_ranges_statuses_[index]
+ : Status::OK();
+ return std::make_unique<ControlledMockFileBatchReader>(
+ data_, schema_, read_batch_size_, read_ranges_, need_prefetch_,
set_read_ranges_status);
+ }
+
+ Result<std::unique_ptr<FileBatchReader>> Build(const std::string& path)
const override {
+ return Status::Invalid("do not support build reader with path in mock
format");
+ }
+
+ private:
+ std::shared_ptr<arrow::Array> data_;
+ std::shared_ptr<arrow::DataType> schema_;
+ int32_t read_batch_size_ = 0;
+ std::vector<std::pair<uint64_t, uint64_t>> read_ranges_;
+ bool need_prefetch_ = true;
+ std::vector<Status> set_read_ranges_statuses_;
+ mutable size_t build_count_ = 0;
+};
+
+struct TestParam {
+ std::string file_format;
+ PrefetchCacheMode cache_mode;
+};
+
+class PrefetchFileBatchReaderImplTest : public ::testing::Test,
+ public
::testing::WithParamInterface<TestParam> {
+ public:
+ void SetUp() override {
+ fields_ = {arrow::field("f0", arrow::utf8()), arrow::field("f1",
arrow::int64()),
+ arrow::field("f2", arrow::boolean())};
+ data_type_ = arrow::struct_(fields_);
+ mock_fs_ = std::make_shared<MockFileSystem>();
+ local_fs_ = std::make_shared<LocalFileSystem>();
+ executor_ = CreateDefaultExecutor(/*thread_count=*/2);
+ dir_ = ::paimon::test::UniqueTestDirectory::Create();
+ ASSERT_TRUE(dir_);
+ }
+ void TearDown() override {}
+
+ std::shared_ptr<arrow::Array> PrepareArray(int32_t length, int32_t offset
= 0) {
+ arrow::StructBuilder struct_builder(
+ data_type_, arrow::default_memory_pool(),
+ {std::make_shared<arrow::StringBuilder>(),
std::make_shared<arrow::Int64Builder>(),
+ std::make_shared<arrow::BooleanBuilder>()});
+ auto string_builder =
static_cast<arrow::StringBuilder*>(struct_builder.field_builder(0));
+ auto big_int_builder =
static_cast<arrow::Int64Builder*>(struct_builder.field_builder(1));
+ auto bool_builder =
static_cast<arrow::BooleanBuilder*>(struct_builder.field_builder(2));
+ for (int32_t i = 0 + offset; i < length + offset; ++i) {
+ EXPECT_TRUE(struct_builder.Append().ok());
+ EXPECT_TRUE(string_builder->Append("str_" +
std::to_string(i)).ok());
+ EXPECT_TRUE(big_int_builder->Append(i).ok());
+ EXPECT_TRUE(bool_builder->Append(static_cast<bool>(i % 2)).ok());
+ }
+ std::shared_ptr<arrow::Array> array;
+ EXPECT_TRUE(struct_builder.Finish(&array).ok());
+ return array;
+ }
+
+ void PrepareTestData(const std::string& file_format_str,
+ const std::shared_ptr<arrow::Array>& array, int32_t
stripe_row_count,
+ int32_t row_index_stride) const {
+ // for simple case, assume that array.length() % row_index_stride == 0
+ ASSERT_EQ(array->length() % row_index_stride, 0);
+ arrow::Schema schema(array->type()->fields());
+ ::ArrowSchema c_schema;
+ ASSERT_TRUE(arrow::ExportSchema(schema, &c_schema).ok());
+ ASSERT_OK_AND_ASSIGN(
+ std::unique_ptr<FileFormat> file_format,
+ FileFormatFactory::Get(
+ file_format_str,
+ {{"parquet.write.max-row-group-length",
std::to_string(row_index_stride)},
+ {"orc.row.index.stride", std::to_string(row_index_stride)}}));
+
+ ASSERT_OK_AND_ASSIGN(auto writer_builder,
+ file_format->CreateWriterBuilder(&c_schema,
1024));
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<OutputStream> out,
+ local_fs_->Create(PathUtil::JoinPath(dir_->Str(), "file." +
file_format_str),
+ /*overwrite=*/false));
+ ASSERT_OK_AND_ASSIGN(auto writer, writer_builder->Build(out, "zstd"));
+
+ int32_t write_batch_count = array->length() / row_index_stride;
+ for (int32_t i = 0; i < write_batch_count; i++) {
+ auto slice = array->Slice(i * row_index_stride, row_index_stride);
+ auto copied_array = arrow::Concatenate({slice}).ValueOr(nullptr);
+ ASSERT_TRUE(copied_array);
+ ::ArrowArray c_array;
+ ASSERT_TRUE(arrow::ExportArray(*copied_array, &c_array).ok());
+ ASSERT_OK(writer->AddBatch(&c_array));
+ }
+ ASSERT_OK(writer->Flush());
+ ASSERT_OK(writer->Finish());
+ ASSERT_OK(out->Flush());
+ ASSERT_OK(out->Close());
+ }
+
+ std::unique_ptr<PrefetchFileBatchReaderImpl> PreparePrefetchReader(
+ const std::string& file_format_str, const arrow::Schema* read_schema,
+ const std::shared_ptr<Predicate>& predicate,
+ const std::optional<RoaringBitmap32>& selection_bitmap, int32_t
batch_size,
+ int32_t prefetch_max_parallel_num, PrefetchCacheMode cache_mode) const
{
+ EXPECT_OK_AND_ASSIGN(std::unique_ptr<FileFormat> file_format,
+ FileFormatFactory::Get(file_format_str, {}));
+ EXPECT_OK_AND_ASSIGN(auto reader_builder,
file_format->CreateReaderBuilder(batch_size));
+ EXPECT_OK_AND_ASSIGN(
+ std::unique_ptr<PrefetchFileBatchReaderImpl> reader,
+ PrefetchFileBatchReaderImpl::Create(
+ PathUtil::JoinPath(dir_->Str(), "file." +
file_format->Identifier()),
+ reader_builder.get(), local_fs_, prefetch_max_parallel_num,
batch_size,
+ prefetch_max_parallel_num * 2,
/*enable_adaptive_prefetch_strategy=*/false,
+ CreateDefaultExecutor(prefetch_max_parallel_num - 1),
+ /*initialize_read_ranges=*/false, cache_mode, CacheConfig(),
GetDefaultPool()));
+ std::unique_ptr<ArrowSchema> c_schema =
std::make_unique<ArrowSchema>();
+ auto arrow_status = arrow::ExportSchema(*read_schema, c_schema.get());
+ EXPECT_TRUE(arrow_status.ok());
+ EXPECT_OK(reader->SetReadSchema(c_schema.get(), predicate,
selection_bitmap));
+ return reader;
+ }
+
+ bool HasValue(const std::vector<
+
std::unique_ptr<ThreadsafeQueue<PrefetchFileBatchReaderImpl::PrefetchBatch>>>&
+ prefetch_queues) {
+ for (const auto& queue : prefetch_queues) {
+ if (!queue->empty()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ bool CheckEqual(const std::shared_ptr<arrow::ChunkedArray>& lhs,
+ const std::shared_ptr<arrow::ChunkedArray>& rhs) {
+ std::string lhs_str, rhs_str;
+ auto print_option = arrow::PrettyPrintOptions::Defaults();
+ print_option.window = 1000;
+ print_option.container_window = 1000;
+ EXPECT_TRUE(arrow::PrettyPrint(*lhs, print_option, &lhs_str).ok());
+ EXPECT_TRUE(arrow::PrettyPrint(*rhs, print_option, &rhs_str).ok());
+ bool is_equal = lhs->Equals(rhs);
+ if (!is_equal) {
+ std::cout << "lhs array: " << lhs_str << ", rhs array: " <<
rhs_str;
+ }
+ return is_equal;
+ }
+
+ private:
+ arrow::FieldVector fields_;
+ std::shared_ptr<arrow::DataType> data_type_;
+ std::shared_ptr<FileSystem> mock_fs_;
+ std::shared_ptr<FileSystem> local_fs_;
+ std::unique_ptr<paimon::test::UniqueTestDirectory> dir_;
+ std::shared_ptr<Executor> executor_;
+};
+
+std::vector<TestParam> PrepareTestParam() {
+ std::vector<TestParam> values = {
+ TestParam{"parquet", PrefetchCacheMode::ALWAYS},
+ TestParam{"parquet", PrefetchCacheMode::EXCLUDE_BITMAP},
+ TestParam{"parquet", PrefetchCacheMode::EXCLUDE_PREDICATE},
+ TestParam{"parquet", PrefetchCacheMode::EXCLUDE_BITMAP_OR_PREDICATE},
+ TestParam{"parquet", PrefetchCacheMode::NEVER}};
+#ifdef PAIMON_ENABLE_ORC
+ values.emplace_back(TestParam{"orc", PrefetchCacheMode::ALWAYS});
+ values.emplace_back(TestParam{"orc", PrefetchCacheMode::EXCLUDE_BITMAP});
+ values.emplace_back(TestParam{"orc",
PrefetchCacheMode::EXCLUDE_PREDICATE});
+ values.emplace_back(TestParam{"orc",
PrefetchCacheMode::EXCLUDE_BITMAP_OR_PREDICATE});
+ values.emplace_back(TestParam{"orc", PrefetchCacheMode::NEVER});
+#endif
+ return values;
+}
+
+INSTANTIATE_TEST_SUITE_P(TestParam, PrefetchFileBatchReaderImplTest,
+ ::testing::ValuesIn(PrepareTestParam()));
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestSimple) {
+ auto data_array = PrepareArray(101);
+ int32_t batch_size = 10;
+ for (auto prefetch_max_parallel_num : {1, 2, 3, 5, 8, 10}) {
+ MockFormatReaderBuilder reader_builder(data_array, data_type_,
batch_size);
+ ASSERT_OK_AND_ASSIGN(
+ auto reader,
+ PrefetchFileBatchReaderImpl::Create(
+ /*data_file_path=*/"", &reader_builder, mock_fs_,
prefetch_max_parallel_num,
+ batch_size, prefetch_max_parallel_num * 2,
+ /*enable_adaptive_prefetch_strategy=*/false, executor_,
+ /*initialize_read_ranges=*/true,
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+ CacheConfig(), GetDefaultPool()));
+ ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
+ ASSERT_OK_AND_ASSIGN(auto result_array,
+ ReadResultCollector::CollectResult(
+ reader.get(), /*max simulated data processing
time*/ 100));
+ ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 101);
+ auto expected_array =
std::make_shared<arrow::ChunkedArray>(data_array);
+ ASSERT_TRUE(result_array->Equals(expected_array));
+ }
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestReadWithLimits) {
+ auto data_array = PrepareArray(101);
+ int32_t batch_size = 10;
+ int32_t prefetch_max_parallel_num = 12;
+
+ MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+ ASSERT_OK_AND_ASSIGN(
+ auto reader,
+ PrefetchFileBatchReaderImpl::Create(
+ /*data_file_path=*/"", &reader_builder, mock_fs_,
prefetch_max_parallel_num, batch_size,
+ prefetch_max_parallel_num * 2,
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+ /*initialize_read_ranges=*/true,
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+ CacheConfig(), GetDefaultPool()));
+ // simulate read limits, only read 8 batches
+ for (int32_t i = 0; i < 8; i++) {
+ ASSERT_OK_AND_ASSIGN(BatchReader::ReadBatchWithBitmap
batch_with_bitmap,
+ reader->NextBatchWithBitmap());
+ auto& [batch, bitmap] = batch_with_bitmap;
+ ASSERT_EQ(batch.first->length, bitmap.Cardinality());
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Array> array,
+ ReadResultCollector::GetArray(std::move(batch)));
+ ASSERT_TRUE(array);
+ }
+ reader->Close();
+ // test metrics
+ auto read_metrics = reader->GetReaderMetrics();
+ ASSERT_TRUE(read_metrics);
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestReadWithoutInitializeReadRanges) {
+ auto data_array = PrepareArray(101);
+ int32_t batch_size = 10;
+ int32_t prefetch_max_parallel_num = 12;
+
+ MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+ ASSERT_OK_AND_ASSIGN(
+ auto reader,
+ PrefetchFileBatchReaderImpl::Create(
+ /*data_file_path=*/"", &reader_builder, mock_fs_,
prefetch_max_parallel_num, batch_size,
+ prefetch_max_parallel_num * 2,
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+ /*initialize_read_ranges=*/false,
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+ CacheConfig(), GetDefaultPool()));
+ // simulate read limits, only read 8 batches
+ ASSERT_NOK_WITH_MSG(reader->NextBatchWithBitmap(),
+ "prefetch reader read ranges are not initialized");
+ reader->Close();
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, FilterReadRangesWithoutBitmap) {
+ std::vector<std::pair<uint64_t, uint64_t>> read_ranges = {
+ {0, 1000}, {1000, 2000}, {2000, 3000}, {3000, 4000}, {4000, 5000},
+ {5000, 6000}, {6000, 7000}, {7000, 8000}, {8000, 9000}, {9000, 10000}};
+ auto filtered_ranges =
PrefetchFileBatchReaderImpl::FilterReadRanges(read_ranges, std::nullopt);
+ ASSERT_EQ(filtered_ranges, read_ranges);
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, FilterReadRangesWithAllZeroBitmap) {
+ std::vector<std::pair<uint64_t, uint64_t>> read_ranges = {
+ {0, 1000}, {1000, 2000}, {2000, 3000}, {3000, 4000}, {4000, 5000},
+ {5000, 6000}, {6000, 7000}, {7000, 8000}, {8000, 9000}, {9000, 10000}};
+ auto bitmap = RoaringBitmap32::From({});
+ auto filtered_ranges =
PrefetchFileBatchReaderImpl::FilterReadRanges(read_ranges, bitmap);
+ ASSERT_TRUE(filtered_ranges.empty());
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, FilterReadRangesWithBitmap) {
+ auto data_array = PrepareArray(10000);
+ std::set<int32_t> valid_row_ids;
+ for (int32_t i = 1000; i < 2000; i++) {
+ valid_row_ids.insert(i);
+ }
+ for (int32_t i = 3000; i < 6500; i++) {
+ valid_row_ids.insert(i);
+ }
+ std::vector<int32_t> bitmap_data(valid_row_ids.begin(),
valid_row_ids.end());
+ auto bitmap = RoaringBitmap32::From(bitmap_data);
+ std::vector<std::pair<uint64_t, uint64_t>> read_ranges = {
+ {0, 1000}, {1000, 2000}, {2000, 3000}, {3000, 4000}, {4000, 5000},
+ {5000, 6000}, {6000, 7000}, {7000, 8000}, {8000, 9000}, {9000, 10000}};
+ auto filtered_ranges =
PrefetchFileBatchReaderImpl::FilterReadRanges(read_ranges, bitmap);
+ std::vector<std::pair<uint64_t, uint64_t>> expected_filtered_ranges = {
+ {1000, 2000}, {3000, 4000}, {4000, 5000}, {5000, 6000}, {6000, 7000}};
+ ASSERT_EQ(expected_filtered_ranges, filtered_ranges);
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, DispatchReadRangesEmpty) {
+ std::vector<std::pair<uint64_t, uint64_t>> read_ranges;
+ auto read_ranges_in_group =
PrefetchFileBatchReaderImpl::DispatchReadRanges(read_ranges, 3);
+ ASSERT_EQ(read_ranges_in_group.size(), 3);
+ ASSERT_TRUE(read_ranges_in_group[0].empty());
+ ASSERT_TRUE(read_ranges_in_group[1].empty());
+ ASSERT_TRUE(read_ranges_in_group[2].empty());
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, DispatchReadRanges) {
+ std::vector<std::pair<uint64_t, uint64_t>> read_ranges = {
+ {0, 10000}, {10000, 20000}, {20000, 30000}, {30000, 40000}};
+ auto read_ranges_in_group =
PrefetchFileBatchReaderImpl::DispatchReadRanges(read_ranges, 3);
+ std::vector<std::pair<uint64_t, uint64_t>> expected_group_0 = {{0, 10000},
{30000, 40000}};
+ ASSERT_EQ(read_ranges_in_group[0], expected_group_0);
+ std::vector<std::pair<uint64_t, uint64_t>> expected_group_1 = {{10000,
20000}};
+ ASSERT_EQ(read_ranges_in_group[1], expected_group_1);
+ std::vector<std::pair<uint64_t, uint64_t>> expected_group_2 = {{20000,
30000}};
+ ASSERT_EQ(read_ranges_in_group[2], expected_group_2);
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, RefreshReadRanges) {
+ auto data_array = PrepareArray(101);
+ int32_t batch_size = 30;
+ int32_t prefetch_max_parallel_num = 3;
+ MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+ ASSERT_OK_AND_ASSIGN(
+ auto reader,
+ PrefetchFileBatchReaderImpl::Create(
+ /*data_file_path=*/"", &reader_builder, mock_fs_,
prefetch_max_parallel_num, batch_size,
+ prefetch_max_parallel_num * 2,
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+ /*initialize_read_ranges=*/false,
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+ CacheConfig(), GetDefaultPool()));
+ auto prefetch_reader =
dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
+ ASSERT_OK(prefetch_reader->RefreshReadRanges());
+ std::vector<std::pair<uint64_t, uint64_t>> read_ranges_0 = {{0, 30}, {90,
101}};
+ auto mock_reader_0 =
dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[0].get());
+ ASSERT_EQ(mock_reader_0->GetReadRanges(), read_ranges_0);
+ std::vector<std::pair<uint64_t, uint64_t>> read_ranges_1 = {{30, 60}};
+ auto mock_reader_1 =
dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[1].get());
+ ASSERT_EQ(mock_reader_1->GetReadRanges(), read_ranges_1);
+ std::vector<std::pair<uint64_t, uint64_t>> read_ranges_2 = {{60, 90}};
+ auto mock_reader_2 =
dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[2].get());
+ ASSERT_EQ(mock_reader_2->GetReadRanges(), read_ranges_2);
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest,
RefreshReadRangesDisablePrefetchByAdaptiveStrategy) {
+ auto data_array = PrepareArray(200);
+ int32_t batch_size = 10;
+ int32_t prefetch_max_parallel_num = 1;
+ ControlledMockFormatReaderBuilder reader_builder(data_array, data_type_,
batch_size,
+ /*read_ranges=*/{{0,
100}},
+ /*need_prefetch=*/true,
+
/*set_read_ranges_statuses=*/{});
+
+ ASSERT_OK_AND_ASSIGN(
+ auto reader,
+ PrefetchFileBatchReaderImpl::Create(
+ /*data_file_path=*/"", &reader_builder, mock_fs_,
prefetch_max_parallel_num, batch_size,
+ /*prefetch_batch_count=*/2,
+ /*enable_adaptive_prefetch_strategy=*/true, executor_,
+ /*initialize_read_ranges=*/true,
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+ CacheConfig(), GetDefaultPool()));
+
+ ASSERT_FALSE(reader->NeedPrefetch());
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, SetReadRanges) {
+ auto data_array = PrepareArray(400);
+ int32_t batch_size = 30;
+ int32_t prefetch_max_parallel_num = 3;
+ MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+ ASSERT_OK_AND_ASSIGN(
+ auto reader,
+ PrefetchFileBatchReaderImpl::Create(
+ /*data_file_path=*/"", &reader_builder, mock_fs_,
prefetch_max_parallel_num, batch_size,
+ prefetch_max_parallel_num * 2,
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+ /*initialize_read_ranges=*/false,
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+ CacheConfig(), GetDefaultPool()));
+ auto prefetch_reader =
dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
+ ASSERT_FALSE(prefetch_reader->need_prefetch_);
+ prefetch_reader->need_prefetch_ = true;
+ std::vector<std::pair<uint64_t, uint64_t>> ranges = {
+ {0, 100}, {100, 200}, {200, 300}, {300, 400}};
+ ASSERT_OK(prefetch_reader->SetReadRanges(ranges));
+ auto& read_ranges_queue = prefetch_reader->read_ranges_;
+ std::vector<std::pair<uint64_t, uint64_t>> read_ranges;
+ for (auto& iter : read_ranges_queue) {
+ read_ranges.push_back(iter);
+ }
+ ranges.emplace_back(400, 401);
+ ASSERT_EQ(read_ranges, ranges);
+
+ std::vector<std::pair<uint64_t, uint64_t>> read_ranges_0 = {{0, 100},
{300, 400}};
+ auto mock_reader_0 =
dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[0].get());
+ ASSERT_EQ(mock_reader_0->GetReadRanges(), read_ranges_0);
+ std::vector<std::pair<uint64_t, uint64_t>> read_ranges_1 = {{100, 200}};
+ auto mock_reader_1 =
dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[1].get());
+ ASSERT_EQ(mock_reader_1->GetReadRanges(), read_ranges_1);
+ std::vector<std::pair<uint64_t, uint64_t>> read_ranges_2 = {{200, 300}};
+ auto mock_reader_2 =
dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[2].get());
+ ASSERT_EQ(mock_reader_2->GetReadRanges(), read_ranges_2);
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest,
SetReadRangesReturnErrorWhenPushDownFailed) {
+ auto data_array = PrepareArray(100);
+ int32_t batch_size = 10;
+ int32_t prefetch_max_parallel_num = 2;
+ ControlledMockFormatReaderBuilder reader_builder(
+ data_array, data_type_, batch_size,
+ /*read_ranges=*/{{0, 50}, {50, 100}},
+ /*need_prefetch=*/true,
+ /*set_read_ranges_statuses=*/{Status::OK(), Status::IOError("set read
ranges failed")});
+
+ ASSERT_OK_AND_ASSIGN(
+ auto reader,
+ PrefetchFileBatchReaderImpl::Create(
+ /*data_file_path=*/"", &reader_builder, mock_fs_,
prefetch_max_parallel_num, batch_size,
+ prefetch_max_parallel_num * 2,
+ /*enable_adaptive_prefetch_strategy=*/false, executor_,
+ /*initialize_read_ranges=*/false,
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+ CacheConfig(), GetDefaultPool()));
+
+ auto prefetch_reader =
dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
+ prefetch_reader->need_prefetch_ = true;
+
+ Status status = prefetch_reader->SetReadRanges({{0, 50}, {50, 100}});
+ ASSERT_FALSE(status.ok());
+ ASSERT_TRUE(status.IsIOError());
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, NeedInitCacheNeverMode) {
+ auto data_array = PrepareArray(10);
+ int32_t batch_size = 5;
+ int32_t prefetch_max_parallel_num = 1;
+ MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+ ASSERT_OK_AND_ASSIGN(
+ auto reader,
+ PrefetchFileBatchReaderImpl::Create(
+ /*data_file_path=*/"", &reader_builder, mock_fs_,
prefetch_max_parallel_num, batch_size,
+ prefetch_max_parallel_num * 2,
+ /*enable_adaptive_prefetch_strategy=*/false, executor_,
+ /*initialize_read_ranges=*/false,
/*prefetch_cache_mode=*/PrefetchCacheMode::NEVER,
+ CacheConfig(), GetDefaultPool()));
+
+ auto prefetch_reader =
dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
+ ASSERT_FALSE(prefetch_reader->NeedInitCache());
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest,
WorkloopSetReadStatusWhenCacheInitFailed) {
+ auto data_array = PrepareArray(10);
+ int32_t batch_size = 5;
+ int32_t prefetch_max_parallel_num = 1;
+ MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+ CacheConfig invalid_cache_config(
+ /*buffer_size_limit=*/512 * 1024,
+
/*range_size_limit=*/static_cast<uint64_t>(std::numeric_limits<uint32_t>::max())
+ 1,
+ /*hole_size_limit=*/8 * 1024,
+ /*pre_buffer_limit=*/128 * 1024);
+
+ ASSERT_OK_AND_ASSIGN(
+ auto reader,
+ PrefetchFileBatchReaderImpl::Create(
+ /*data_file_path=*/"", &reader_builder, mock_fs_,
prefetch_max_parallel_num, batch_size,
+ prefetch_max_parallel_num * 2,
+ /*enable_adaptive_prefetch_strategy=*/false, executor_,
+ /*initialize_read_ranges=*/false,
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+ invalid_cache_config, GetDefaultPool()));
+
+ auto prefetch_reader =
dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
+ prefetch_reader->Workloop();
+
+ Status status = prefetch_reader->GetReadStatus();
+ ASSERT_FALSE(status.ok());
+ ASSERT_TRUE(status.IsInvalid());
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, DoReadBatchReturnOkWhenShutdown) {
+ auto data_array = PrepareArray(10);
+ int32_t batch_size = 5;
+ int32_t prefetch_max_parallel_num = 1;
+ MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+ ASSERT_OK_AND_ASSIGN(
+ auto reader,
+ PrefetchFileBatchReaderImpl::Create(
+ /*data_file_path=*/"", &reader_builder, mock_fs_,
prefetch_max_parallel_num, batch_size,
+ prefetch_max_parallel_num * 2,
+ /*enable_adaptive_prefetch_strategy=*/false, executor_,
+ /*initialize_read_ranges=*/false,
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+ CacheConfig(), GetDefaultPool()));
+
+ auto prefetch_reader =
dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
+ prefetch_reader->is_shutdown_ = true;
+ ASSERT_OK(prefetch_reader->DoReadBatch(/*reader_idx=*/0));
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest,
DoReadBatchReturnOkWhenNoCurrentReadRange) {
+ auto data_array = PrepareArray(10);
+ int32_t batch_size = 5;
+ int32_t prefetch_max_parallel_num = 1;
+ MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+ ASSERT_OK_AND_ASSIGN(
+ auto reader,
+ PrefetchFileBatchReaderImpl::Create(
+ /*data_file_path=*/"", &reader_builder, mock_fs_,
prefetch_max_parallel_num, batch_size,
+ prefetch_max_parallel_num * 2,
+ /*enable_adaptive_prefetch_strategy=*/false, executor_,
+ /*initialize_read_ranges=*/false,
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+ CacheConfig(), GetDefaultPool()));
+
+ auto prefetch_reader =
dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
+ prefetch_reader->read_ranges_in_group_ = {{}};
+ ASSERT_OK(prefetch_reader->DoReadBatch(/*reader_idx=*/0));
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestReadWithLargeBatchSize) {
+ auto data_array = PrepareArray(101);
+ int32_t batch_size = 150;
+ int32_t prefetch_max_parallel_num = 3;
+ MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+ ASSERT_OK_AND_ASSIGN(
+ auto reader,
+ PrefetchFileBatchReaderImpl::Create(
+ /*data_file_path=*/"", &reader_builder, mock_fs_,
prefetch_max_parallel_num, batch_size,
+ prefetch_max_parallel_num * 2,
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+ /*initialize_read_ranges=*/true,
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+ CacheConfig(), GetDefaultPool()));
+ ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
+ ASSERT_OK_AND_ASSIGN(auto result_array,
+ ReadResultCollector::CollectResult(
+ reader.get(), /*max simulated data processing
time*/ 100));
+ ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 101);
+ auto expected_array = std::make_shared<arrow::ChunkedArray>(data_array);
+ ASSERT_TRUE(result_array->Equals(expected_array));
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestPartialReaderSuccessRead) {
+ auto data_array = PrepareArray(101);
+ int32_t batch_size = 10;
+ int32_t prefetch_max_parallel_num = 3;
+ MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+ ASSERT_OK_AND_ASSIGN(
+ auto reader,
+ PrefetchFileBatchReaderImpl::Create(
+ /*data_file_path=*/"", &reader_builder, mock_fs_,
prefetch_max_parallel_num, batch_size,
+ prefetch_max_parallel_num,
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+ /*initialize_read_ranges=*/true,
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+ CacheConfig(), GetDefaultPool()));
+ auto prefetch_reader =
dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
+ for (int32_t i = 0; i < prefetch_max_parallel_num; i++) {
+ dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[i].get())
+ ->EnableRandomizeBatchSize(false);
+ }
+
+ arrow::ArrayVector result_array_vector;
+ ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
+ ASSERT_OK_AND_ASSIGN(auto batch_with_bitmap,
reader->NextBatchWithBitmap());
+ auto& [batch, bitmap] = batch_with_bitmap;
+ ASSERT_EQ(batch.first->length, bitmap.Cardinality());
+ ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 0);
+ ASSERT_OK_AND_ASSIGN(auto array,
ReadResultCollector::GetArray(std::move(batch)));
+ result_array_vector.push_back(array);
+ ASSERT_OK(prefetch_reader->GetReadStatus());
+ usleep(100000); // sleep 100ms to ensure that the other data has been
pushed
+ ASSERT_TRUE(HasValue(prefetch_reader->prefetch_queues_));
+
+ // Set IOError for reader[1] after the first NextBatch().
+ // Now the data in prefetch_queues_[0] is [30,39], prefetch_queues_[1] is
[10,19],
+ // prefetch_queues_[2] is [20,29],
+ // So, the IOError will occur at [40,49].
+ dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[1].get())
+ ->SetNextBatchStatus(Status::IOError("mock error"));
+ usleep(100000);
+ // pop [10,19]
+ ASSERT_OK_AND_ASSIGN(batch_with_bitmap, reader->NextBatchWithBitmap());
+ // now reader1 fetch [40,49] and set error status.
+ usleep(100000);
+ ASSERT_NOK(reader->NextBatchWithBitmap());
+ ReaderUtils::ReleaseReadBatch(std::move(batch_with_bitmap.first));
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestAllReaderFailedWithIOError) {
+ auto data_array = PrepareArray(101);
+ int32_t batch_size = 10;
+ int32_t prefetch_max_parallel_num = 3;
+ MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+ ASSERT_OK_AND_ASSIGN(
+ auto reader,
+ PrefetchFileBatchReaderImpl::Create(
+ /*data_file_path=*/"", &reader_builder, mock_fs_,
prefetch_max_parallel_num, batch_size,
+ prefetch_max_parallel_num * 2,
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+ /*initialize_read_ranges=*/true,
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+ CacheConfig(), GetDefaultPool()));
+
+ auto prefetch_reader =
dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
+ for (int32_t i = 0; i < prefetch_max_parallel_num; i++) {
+ dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[i].get())
+ ->SetNextBatchStatus(Status::IOError("mock error"));
+ }
+
+ ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
+ auto batch_result = reader->NextBatchWithBitmap();
+ ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
+ ASSERT_FALSE(batch_result.ok());
+ ASSERT_TRUE(batch_result.status().IsIOError());
+ ASSERT_FALSE(prefetch_reader->is_shutdown_);
+ ASSERT_NOK(prefetch_reader->GetReadStatus());
+ ASSERT_FALSE(HasValue(prefetch_reader->prefetch_queues_));
+
+ // call NextBatch again, will still return error status
+ auto batch_result2 = reader->NextBatchWithBitmap();
+ ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
+ ASSERT_FALSE(batch_result2.ok());
+ ASSERT_TRUE(batch_result2.status().IsIOError());
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestPrefetchWithEmptyData) {
+ auto data_array = PrepareArray(0);
+ int32_t batch_size = 10;
+ int32_t prefetch_max_parallel_num = 3;
+ MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+ ASSERT_OK_AND_ASSIGN(
+ auto reader,
+ PrefetchFileBatchReaderImpl::Create(
+ /*data_file_path=*/"", &reader_builder, mock_fs_,
prefetch_max_parallel_num, batch_size,
+ prefetch_max_parallel_num * 2,
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+ /*initialize_read_ranges=*/true,
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+ CacheConfig(), GetDefaultPool()));
+ ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
+ ASSERT_OK_AND_ASSIGN(auto result_array,
+ ReadResultCollector::CollectResult(
+ reader.get(), /*max simulated data processing
time*/ 100));
+ ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 0);
+ ASSERT_FALSE(result_array);
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestCallNextBatchAfterReadingEof) {
+ auto data_array = PrepareArray(10);
+ int32_t batch_size = 10;
+ int32_t prefetch_max_parallel_num = 6;
+ MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+ ASSERT_OK_AND_ASSIGN(
+ auto reader,
+ PrefetchFileBatchReaderImpl::Create(
+ /*data_file_path=*/"", &reader_builder, mock_fs_,
prefetch_max_parallel_num, batch_size,
+ prefetch_max_parallel_num * 2,
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+ /*initialize_read_ranges=*/true,
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+ CacheConfig(), GetDefaultPool()));
+ ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
+ ASSERT_OK_AND_ASSIGN(auto result_array,
+ ReadResultCollector::CollectResult(
+ reader.get(), /*max simulated data processing
time*/ 100));
+ ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 10);
+ auto expected_array = std::make_shared<arrow::ChunkedArray>(data_array);
+ ASSERT_TRUE(result_array->Equals(expected_array));
+
+ // continue to call NextBatch() after reading eof
+ ASSERT_OK_AND_ASSIGN(auto batch_with_bitmap,
reader->NextBatchWithBitmap());
+ ASSERT_TRUE(BatchReader::IsEofBatch(batch_with_bitmap));
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestCreateReaderWithoutNextBatch) {
+ auto data_array = PrepareArray(101);
+ int32_t batch_size = 10;
+ int32_t prefetch_max_parallel_num = 3;
+ MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+ ASSERT_OK_AND_ASSIGN(
+ auto reader,
+ PrefetchFileBatchReaderImpl::Create(
+ /*data_file_path=*/"", &reader_builder, mock_fs_,
prefetch_max_parallel_num, batch_size,
+ prefetch_max_parallel_num * 2,
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+ /*initialize_read_ranges=*/true,
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+ CacheConfig(), GetDefaultPool()));
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestInvalidCase) {
+ auto data_array = PrepareArray(101);
+ int32_t batch_size = 10;
+ int32_t prefetch_max_parallel_num = 3;
+ std::string data_file_path = "";
+ MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
+ {
+ ASSERT_NOK(PrefetchFileBatchReaderImpl::Create(
+ data_file_path, &reader_builder, mock_fs_,
+ /*prefetch_max_parallel_num=*/0, batch_size, 2,
+ /*enable_adaptive_prefetch_strategy=*/false, executor_,
+ /*initialize_read_ranges=*/true,
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+ CacheConfig(), GetDefaultPool()));
+ }
+ {
+ ASSERT_NOK(PrefetchFileBatchReaderImpl::Create(
+ data_file_path, &reader_builder, mock_fs_,
prefetch_max_parallel_num, /*batch_size=*/-1,
+ prefetch_max_parallel_num * 2,
/*enable_adaptive_prefetch_strategy=*/false, executor_,
+ /*initialize_read_ranges=*/true,
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+ CacheConfig(), GetDefaultPool()));
+ }
+ {
+ ASSERT_NOK(PrefetchFileBatchReaderImpl::Create(
+ data_file_path, &reader_builder, mock_fs_,
prefetch_max_parallel_num, batch_size,
+ prefetch_max_parallel_num * 2,
+ /*enable_adaptive_prefetch_strategy=*/false,
+ /*executor=*/nullptr, /*initialize_read_ranges=*/true,
+ /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS, CacheConfig(),
GetDefaultPool()));
+ }
+ {
+ ASSERT_NOK(PrefetchFileBatchReaderImpl::Create(
+ data_file_path, /*reader_builder=*/nullptr, mock_fs_,
prefetch_max_parallel_num,
+ batch_size, prefetch_max_parallel_num * 2,
+ /*enable_adaptive_prefetch_strategy=*/false, executor_,
+ /*initialize_read_ranges=*/true,
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+ CacheConfig(), GetDefaultPool()));
+ }
+ {
+ ASSERT_NOK(PrefetchFileBatchReaderImpl::Create(
+ data_file_path, &reader_builder,
+ /*fs=*/nullptr, prefetch_max_parallel_num, batch_size,
prefetch_max_parallel_num * 2,
+ /*enable_adaptive_prefetch_strategy=*/false, executor_,
+ /*initialize_read_ranges=*/true,
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+ CacheConfig(), GetDefaultPool()));
+ }
+ {
+ ASSERT_OK_AND_ASSIGN(
+ auto reader,
+ PrefetchFileBatchReaderImpl::Create(
+ data_file_path, &reader_builder, mock_fs_,
prefetch_max_parallel_num, batch_size,
+ prefetch_max_parallel_num * 2,
+ /*enable_adaptive_prefetch_strategy=*/false, executor_,
+ /*initialize_read_ranges=*/true,
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+ CacheConfig(), GetDefaultPool()));
+ ASSERT_NOK_WITH_MSG(reader->SeekToRow(/*row_number=*/101),
+ "not support seek to row for prefetch reader");
+ }
+}
+
+/// There are three stripes: [0,30), [30,60), [60,90). After predicate
pushdown, the stripe
+/// [30,60) will be filtered out.
+/// The read range is [0,30), [30,60), [60,90). So, expected results is
[0,30), [60,90)
+TEST_P(PrefetchFileBatchReaderImplTest,
TestPrefetchWithPredicatePushdownWithCompleteFiltering) {
+ auto [file_format, cache_mode] = GetParam();
+ auto data_array = PrepareArray(90);
+ PrepareTestData(file_format, data_array, /*stripe_row_count=*/30,
/*row_index_stride=*/30);
+ auto schema = arrow::schema(fields_);
+ ASSERT_OK_AND_ASSIGN(auto predicate,
+ PredicateBuilder::Or({
+ PredicateBuilder::LessThan(/*field_index=*/1,
/*field_name=*/"f1",
+ FieldType::BIGINT,
Literal(20l)),
+ PredicateBuilder::GreaterThan(/*field_index=*/1,
/*field_name=*/"f1",
+ FieldType::BIGINT,
Literal(70l)),
+ }));
+
+ auto reader =
+ PreparePrefetchReader(file_format, schema.get(), predicate,
+ /*selection_bitmap=*/std::nullopt,
+ /*batch_size=*/10,
/*prefetch_max_parallel_num=*/3, cache_mode);
+ ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
+ ASSERT_OK_AND_ASSIGN(auto result_array,
+ ReadResultCollector::CollectResult(
+ reader.get(), /*max simulated data processing
time*/ 100));
+ ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 90);
+
+ arrow::ArrayVector expected_array_vector;
+ expected_array_vector.push_back(data_array->Slice(0, 30));
+ expected_array_vector.push_back(data_array->Slice(60, 30));
+ auto expected_array =
std::make_shared<arrow::ChunkedArray>(expected_array_vector);
+ ASSERT_TRUE(CheckEqual(expected_array, result_array));
+}
+
+/// There are three stripes: [0,30), [30,60), [60,90). Each stripe has 3 row
groups.
+/// After predicate pushdown, the row group [0, 20), [70, 90) will be remained.
+/// The read range is [0,30), [30,60), [60,90).
+TEST_P(PrefetchFileBatchReaderImplTest,
+ TestPrefetchWithOrcPredicatePushdownWithRowGroupGranularity) {
+ auto [file_format, cache_mode] = GetParam();
+ auto data_array = PrepareArray(90);
+ PrepareTestData(file_format, data_array, /*stripe_row_count=*/30,
/*row_index_stride=*/10);
+
+ auto schema = arrow::schema(fields_);
+ ASSERT_OK_AND_ASSIGN(auto predicate,
+ PredicateBuilder::Or({
+ PredicateBuilder::LessThan(/*field_index=*/1,
/*field_name=*/"f1",
+ FieldType::BIGINT,
Literal(20l)),
+ PredicateBuilder::GreaterThan(/*field_index=*/1,
/*field_name=*/"f1",
+ FieldType::BIGINT,
Literal(70l)),
+ }));
+
+ auto reader =
+ PreparePrefetchReader(file_format, schema.get(), predicate,
+ /*selection_bitmap=*/std::nullopt,
+ /*batch_size=*/10,
/*prefetch_max_parallel_num=*/3, cache_mode);
+ ASSERT_OK(reader->RefreshReadRanges());
+ ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
+ ASSERT_OK_AND_ASSIGN(auto result_array,
+ ReadResultCollector::CollectResult(
+ reader.get(), /*max simulated data processing
time*/ 100));
+ ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 90);
+
+ arrow::ArrayVector expected_array_vector;
+ expected_array_vector.push_back(data_array->Slice(0, 20));
+ expected_array_vector.push_back(data_array->Slice(70, 20));
+ auto expected_array =
std::make_shared<arrow::ChunkedArray>(expected_array_vector);
+ ASSERT_TRUE(CheckEqual(expected_array, result_array));
+}
+
+TEST_F(PrefetchFileBatchReaderImplTest, TestPrefetchWithBitmap) {
+ auto data_array = PrepareArray(10000);
+ std::set<int32_t> valid_row_ids;
+ for (int32_t i = 0; i < 5120; i++) {
+ valid_row_ids.insert(paimon::test::RandomNumber(0,
data_array->length() - 1));
+ }
+ std::vector<int32_t> bitmap_data(valid_row_ids.begin(),
valid_row_ids.end());
+ auto bitmap = RoaringBitmap32::From(bitmap_data);
+ MockFormatReaderBuilder reader_builder(data_array, data_type_, bitmap,
+ /*read_batch_size=*/100);
+ int32_t prefetch_max_parallel_num = 3;
+ ASSERT_OK_AND_ASSIGN(
+ auto reader,
+ PrefetchFileBatchReaderImpl::Create(
+ /*data_file_path=*/"", &reader_builder, mock_fs_,
prefetch_max_parallel_num,
+ /*batch_size=*/100, prefetch_max_parallel_num * 2,
+ /*enable_adaptive_prefetch_strategy=*/false, executor_,
+ /*initialize_read_ranges=*/true,
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
+ CacheConfig(), GetDefaultPool()));
+ ASSERT_OK_AND_ASSIGN(auto result_chunk_array,
ReadResultCollector::CollectResult(reader.get()));
+
+ ASSERT_OK_AND_ASSIGN(auto data_batch,
ReadResultCollector::GetReadBatch(data_array));
+ ASSERT_OK_AND_ASSIGN(auto expected_batch,
ReaderUtils::ApplyBitmapToReadBatch(
+
std::make_pair(std::move(data_batch), bitmap),
+
arrow::default_memory_pool()));
+ ASSERT_OK_AND_ASSIGN(auto expected_array,
+
ReadResultCollector::GetArray(std::move(expected_batch)));
+ auto expected_chunk_array =
std::make_shared<arrow::ChunkedArray>(expected_array);
+ ASSERT_TRUE(result_chunk_array->Equals(expected_chunk_array));
+}
+
+} // namespace paimon::test