This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 9ce823a feat: add batch reader infrastructure (#46)
9ce823a is described below
commit 9ce823a6fcf2087e7dd7ced8db1158046e160ca7
Author: lszskye <[email protected]>
AuthorDate: Thu Jun 4 16:31:18 2026 +0800
feat: add batch reader infrastructure (#46)
---
include/paimon/reader/batch_reader.h | 87 +++++++++
include/paimon/reader/file_batch_reader.h | 62 +++++++
include/paimon/reader/prefetch_file_batch_reader.h | 99 ++++++++++
src/paimon/common/reader/batch_reader.cpp | 46 +++++
.../reader/complete_row_kind_batch_reader.cpp | 111 ++++++++++++
.../common/reader/complete_row_kind_batch_reader.h | 73 ++++++++
.../reader/complete_row_kind_batch_reader_test.cpp | 172 ++++++++++++++++++
src/paimon/common/reader/concat_batch_reader.cpp | 69 +++++++
src/paimon/common/reader/concat_batch_reader.h | 51 ++++++
.../common/reader/concat_batch_reader_test.cpp | 172 ++++++++++++++++++
.../common/reader/delegating_prefetch_reader.h | 89 +++++++++
.../common/reader/predicate_batch_reader.cpp | 107 +++++++++++
src/paimon/common/reader/predicate_batch_reader.h | 70 +++++++
.../common/reader/predicate_batch_reader_test.cpp | 201 +++++++++++++++++++++
src/paimon/common/reader/reader_utils.cpp | 113 ++++++++++++
src/paimon/common/reader/reader_utils.h | 61 +++++++
src/paimon/common/reader/reader_utils_test.cpp | 107 +++++++++++
17 files changed, 1690 insertions(+)
diff --git a/include/paimon/reader/batch_reader.h
b/include/paimon/reader/batch_reader.h
new file mode 100644
index 0000000..07835c4
--- /dev/null
+++ b/include/paimon/reader/batch_reader.h
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+
+#include "paimon/metrics.h"
+#include "paimon/result.h"
+#include "paimon/utils/roaring_bitmap32.h"
+#include "paimon/visibility.h"
+
+struct ArrowArray; // IWYU pragma: keep
+struct ArrowSchema; // IWYU pragma: keep
+
+namespace paimon {
+/// A batch reader that supports reading batch data into an arrow array.
+class PAIMON_EXPORT BatchReader {
+ public:
+ virtual ~BatchReader() = default;
+ using ReadBatch = std::pair<std::unique_ptr<ArrowArray>,
std::unique_ptr<ArrowSchema>>;
+ using ReadBatchWithBitmap = std::pair<ReadBatch, RoaringBitmap32>;
+
+ /// Retrieves the next batch of data.
+ ///
+ /// If EOF is reached, returns an OK status with a nullptr array. Returns
an error status only
+ /// for critical failures (e.g., IO errors). Once an error is returned,
this method must not be
+ /// retried, as it will repeatedly return the same error code.
+ ///
+ /// @return A result containing a `::ReadBatch`, which consists of a
unique pointer to
+ /// `ArrowArray` and a unique pointer to `ArrowSchema`. Returned array
contains a `_VALUE_KIND`
+ /// field (the first field) to indicate the row kind of each row. Deleted
or index-filtered rows
+ /// are removed.
+ virtual Result<ReadBatch> NextBatch() = 0;
+
+ /// Retrieves the next batch of data.
+ ///
+ /// If EOF is reached, returns an OK status with a nullptr array. Returns
an error status only
+ /// for critical failures (e.g., IO errors). Once an error is returned,
this method must not be
+ /// retried, as it will repeatedly return the same error code.
+ ///
+ /// @return A result containing a `::ReadBatch` and a valid bitmap.
`::ReadBatch` consists of a
+ /// unique pointer to `ArrowArray` and a unique pointer to `ArrowSchema`.
Returned array
+ /// contains a _VALUE_KIND field (the first field) to indicate the row
kind of each row. Deleted
+ /// or index-filtered records maybe maintained in `::ReadBatch`, while
bitmap indicates valid
+ /// row id. If deletion vector or index are enabled, this function is more
efficient than
+ /// `NextBatch()`. The default implementation calls `NextBatch()` and adds
all rows to valid
+ /// bitmap. Noted that the returned bitmap has at least one valid row id.
+ virtual Result<ReadBatchWithBitmap> NextBatchWithBitmap();
+
+ /// Retrieves the reader's metrics.
+ /// Note that calling this method frequently may incur significant
performance overhead.
+ /// @return A shared pointer to the `Metrics` object.
+ virtual std::shared_ptr<Metrics> GetReaderMetrics() const = 0;
+
+ /// Closes the `BatchReader`, releasing any associated resources.
+ /// After calling this method, further calls to `NextBatch()` is undefined
and should be
+ /// avoided.
+ virtual void Close() = 0;
+
+ /// Determine whether a `::ReadBatch` or `::ReadBatchWithBitmap` is eof
batch, if return true,
+ /// all the data has been returned.
+ static bool IsEofBatch(const ReadBatch& batch);
+ static bool IsEofBatch(const ReadBatchWithBitmap& batch_with_bitmap);
+
+ /// Make an eof batch or batch with bitmap.
+ static ReadBatch MakeEofBatch();
+ static ReadBatchWithBitmap MakeEofBatchWithBitmap();
+};
+} // namespace paimon
diff --git a/include/paimon/reader/file_batch_reader.h
b/include/paimon/reader/file_batch_reader.h
new file mode 100644
index 0000000..38c89f0
--- /dev/null
+++ b/include/paimon/reader/file_batch_reader.h
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "paimon/predicate/predicate.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/utils/roaring_bitmap32.h"
+namespace paimon {
+/// The batch reader for a single file supports returning the line number of
the last batch read for
+/// deletion vector judgment.
+class PAIMON_EXPORT FileBatchReader : public BatchReader {
+ public:
+ /// @return The schema of the file.
+ virtual Result<std::unique_ptr<::ArrowSchema>> GetFileSchema() const = 0;
+
+ /// Resets the read schema and predicate.
+ ///
+ /// If `SetReadSchema()` is not called, `NextBatch()` will return data
with the file schema.
+ /// After resetting the read schema, `NextBatch()` will read data starting
from the first row.
+ ///
+ /// @param read_schema The schema to set for reading.
+ /// @param predicate The predicate to apply for filtering data.
+ /// @param selection_bitmap The bitmap to apply for filtering data.
+ /// @return The status of the operation.
+ virtual Status SetReadSchema(::ArrowSchema* read_schema,
+ const std::shared_ptr<Predicate>& predicate,
+ const std::optional<RoaringBitmap32>&
selection_bitmap) = 0;
+ using BatchReader::NextBatch;
+ using BatchReader::NextBatchWithBitmap;
+
+ /// Get the row number of the first row in the previously read batch.
+ virtual Result<uint64_t> GetPreviousBatchFirstRowNumber() const = 0;
+
+ /// Get the number of rows in the file.
+ virtual Result<uint64_t> GetNumberOfRows() const = 0;
+
+ /// Get whether or not support read precisely while bitmap pushed down.
+ virtual bool SupportPreciseBitmapSelection() const = 0;
+};
+
+} // namespace paimon
diff --git a/include/paimon/reader/prefetch_file_batch_reader.h
b/include/paimon/reader/prefetch_file_batch_reader.h
new file mode 100644
index 0000000..acc7d0b
--- /dev/null
+++ b/include/paimon/reader/prefetch_file_batch_reader.h
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "paimon/reader/file_batch_reader.h"
+
+namespace paimon {
+
+/// The prefetch file batch reader extends the basic FileBatchReader interface
for prefetch read,
+/// if a format implementation inherits from this class, it will automatically
support the C++
+/// Paimon prefetch capability and integrate with the Paimon prefetch
framework.
+class PAIMON_EXPORT PrefetchFileBatchReader : public FileBatchReader {
+ public:
+ /// Seeks to a specific row in the file.
+ /// @param row_number The row number to seek to.
+ /// @return The status of the operation.
+ virtual Status SeekToRow(uint64_t row_number) = 0;
+
+ /// Retrieves the row number of the next row to be read.
+ /// This method indicates the current read position within the file.
+ /// @return The row number of the next row to read.
+ virtual uint64_t GetNextRowToRead() const = 0;
+
+ /// Generates a list of row ranges to be read in batches.
+ /// Each range specifies the start and end row numbers for a batch,
+ /// allowing for efficient batch processing.
+ ///
+ /// The underlying format layer (e.g., parquet) is responsible for
determining
+ /// the most effective way to split the data. This could be by row groups,
stripes,
+ /// or other internal data structures. The key principle is to split the
data
+ /// into contiguous, seekable ranges to minimize read amplification.
+ ///
+ /// For example:
+ /// - A parquet format could split by RowGroup directly, ensuring each
range aligns
+ /// with a single RowGroup.
+ ///
+ /// The smallest splittable unit must be seekable to its start position,
and the
+ /// splitting strategy should aim to avoid read amplification.
+ ///
+ /// @param need_prefetch A pointer to a boolean. The format layer sets
this to indicate whether
+ /// prefetching is beneficial for the current scenario, to avoid
performance regression in
+ /// certain cases.
+ /// @return A vector of pairs, where each pair represents a range with a
start and end row
+ /// number.
+ virtual Result<std::vector<std::pair<uint64_t, uint64_t>>> GenReadRanges(
+ bool* need_prefetch) const = 0;
+
+ /// Sets the specific row ranges as a hint to be read from format file.
+ ///
+ /// If the specific file format does not support explicit range-based
reads, implementations may
+ /// gracefully ignore this hint and provide an empty (no-op)
implementation.
+ ///
+ /// @param read_ranges A vector of pairs, where each pair defines a
half-open interval
+ /// `[start_row, end_row)`. The `start_row` is inclusive, and the
`end_row` is exclusive.
+ virtual Status SetReadRanges(const std::vector<std::pair<uint64_t,
uint64_t>>& read_ranges) = 0;
+
+ /// Returns a list of file offset/length ranges that should be prefetched
for the current read
+ /// scenario.
+ ///
+ /// This method should analyze the columns selected by the user and return
the minimal set of
+ /// physical file ranges (offset, length) that need to be read, avoiding
unnecessary IO
+ /// amplification. For example, if only a subset of columns is requested,
the implementation
+ /// should only return the byte ranges corresponding to those columns,
rather than the entire
+ /// row group or block.
+ ///
+ /// This enables the cache to prefetch only the required data, reducing
disk and network load
+ /// and improving performance for columnar formats and selective queries.
+ ///
+ /// By default, returns an empty list (no prefetching). Format-specific
implementations should
+ /// override this method to provide accurate offset/length hints for
efficient IO.
+ /// @return A vector of pairs, where each pair contains the file offset
and length to be
+ /// prefetched.
+ virtual Result<std::vector<std::pair<uint64_t, uint64_t>>>
PreBufferRange() {
+ return std::vector<std::pair<uint64_t, uint64_t>>{};
+ }
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/reader/batch_reader.cpp
b/src/paimon/common/reader/batch_reader.cpp
new file mode 100644
index 0000000..1a1734d
--- /dev/null
+++ b/src/paimon/common/reader/batch_reader.cpp
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/reader/batch_reader.h"
+
+#include "arrow/c/abi.h"
+#include "paimon/common/reader/reader_utils.h"
+
+namespace paimon {
+BatchReader::ReadBatch BatchReader::MakeEofBatch() {
+ return std::make_pair(std::unique_ptr<ArrowArray>(),
std::unique_ptr<ArrowSchema>());
+}
+BatchReader::ReadBatchWithBitmap BatchReader::MakeEofBatchWithBitmap() {
+ return std::make_pair(MakeEofBatch(), RoaringBitmap32());
+}
+
+bool BatchReader::IsEofBatch(const ReadBatch& batch) {
+ return batch.first == nullptr;
+}
+
+bool BatchReader::IsEofBatch(const ReadBatchWithBitmap& batch_with_bitmap) {
+ return batch_with_bitmap.first.first == nullptr;
+}
+
+Result<BatchReader::ReadBatchWithBitmap> BatchReader::NextBatchWithBitmap() {
+ PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatch batch, NextBatch());
+ return ReaderUtils::AddAllValidBitmap(std::move(batch));
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/reader/complete_row_kind_batch_reader.cpp
b/src/paimon/common/reader/complete_row_kind_batch_reader.cpp
new file mode 100644
index 0000000..732f9b2
--- /dev/null
+++ b/src/paimon/common/reader/complete_row_kind_batch_reader.cpp
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/reader/complete_row_kind_batch_reader.h"
+
+#include <cstddef>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/array/util.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/scalar.h"
+#include "paimon/common/reader/reader_utils.h"
+#include "paimon/common/table/special_fields.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/common/types/row_kind.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+Result<BatchReader::ReadBatch> CompleteRowKindBatchReader::NextBatch() {
+ PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatchWithBitmap batch_with_bitmap,
+ NextBatchWithBitmap());
+ return ReaderUtils::ApplyBitmapToReadBatch(std::move(batch_with_bitmap),
arrow_pool_.get());
+}
+
+Result<BatchReader::ReadBatchWithBitmap>
CompleteRowKindBatchReader::NextBatchWithBitmap() {
+ PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatchWithBitmap batch_with_bitmap,
+ reader_->NextBatchWithBitmap());
+ if (BatchReader::IsEofBatch(batch_with_bitmap)) {
+ return batch_with_bitmap;
+ }
+ auto& [batch, bitmap] = batch_with_bitmap;
+ auto& [c_array, c_schema] = batch;
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array>
arrow_array,
+ arrow::ImportArray(c_array.get(),
c_schema.get()));
+ auto struct_array =
std::dynamic_pointer_cast<arrow::StructArray>(arrow_array);
+ if (!struct_array) {
+ return Status::Invalid("cannot cast array to StructArray in
CompleteRowKindBatchReader");
+ }
+ if (struct_array->GetFieldByName(SpecialFields::ValueKind().Name())) {
+ // batch returned by reader_ has value kind, just return
+ PAIMON_RETURN_NOT_OK_FROM_ARROW(
+ arrow::ExportArray(*struct_array, c_array.get(), c_schema.get()));
+ return batch_with_bitmap;
+ }
+ // create value kind array, all are insert
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> row_kind_array,
+ PrepareRowKindArray(struct_array->length()));
+ // complete row kind
+ UpdateFieldNamesWithRowKind(struct_array);
+ arrow::ArrayVector fields_with_row_kind = {row_kind_array};
+ fields_with_row_kind.insert(fields_with_row_kind.end(),
struct_array->fields().begin(),
+ struct_array->fields().end());
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+ std::shared_ptr<arrow::StructArray> array_with_row_kind,
+ arrow::StructArray::Make(fields_with_row_kind,
field_names_with_row_kind_));
+ PAIMON_RETURN_NOT_OK_FROM_ARROW(
+ arrow::ExportArray(*array_with_row_kind, c_array.get(),
c_schema.get()));
+ return batch_with_bitmap;
+}
+
+Result<std::shared_ptr<arrow::Array>>
CompleteRowKindBatchReader::PrepareRowKindArray(
+ int32_t struct_array_length) {
+ if (!row_kind_array_ || row_kind_array_->length() < struct_array_length) {
+ auto row_kind_scalar =
+
std::make_shared<arrow::Int8Scalar>(RowKind::Insert()->ToByteValue());
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+ row_kind_array_,
+ arrow::MakeArrayFromScalar(*row_kind_scalar, struct_array_length,
arrow_pool_.get()));
+ return row_kind_array_;
+ } else {
+ return row_kind_array_->Slice(0, struct_array_length);
+ }
+}
+
+void CompleteRowKindBatchReader::UpdateFieldNamesWithRowKind(
+ const std::shared_ptr<arrow::StructArray>& struct_array) {
+ if (static_cast<size_t>(struct_array->struct_type()->num_fields()) + 1 ==
+ field_names_with_row_kind_.size()) {
+ return;
+ }
+ field_names_with_row_kind_.clear();
+ const auto& fields = struct_array->struct_type()->fields();
+ field_names_with_row_kind_.reserve(fields.size() + 1);
+ field_names_with_row_kind_.push_back(SpecialFields::ValueKind().Name());
+ for (const auto& field : fields) {
+ field_names_with_row_kind_.push_back(field->name());
+ }
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/reader/complete_row_kind_batch_reader.h
b/src/paimon/common/reader/complete_row_kind_batch_reader.h
new file mode 100644
index 0000000..021dfb4
--- /dev/null
+++ b/src/paimon/common/reader/complete_row_kind_batch_reader.h
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/result.h"
+
+namespace arrow {
+class StructArray;
+} // namespace arrow
+
+namespace paimon {
+class MemoryPool;
+class Metrics;
+
+class CompleteRowKindBatchReader : public BatchReader {
+ public:
+ CompleteRowKindBatchReader(std::unique_ptr<BatchReader>&& reader,
+ const std::shared_ptr<MemoryPool>& pool)
+ : arrow_pool_(GetArrowPool(pool)), reader_(std::move(reader)) {}
+
+ Result<ReadBatch> NextBatch() override;
+
+ Result<ReadBatchWithBitmap> NextBatchWithBitmap() override;
+
+ void Close() override {
+ reader_->Close();
+ row_kind_array_.reset();
+ field_names_with_row_kind_.clear();
+ }
+
+ std::shared_ptr<Metrics> GetReaderMetrics() const override {
+ return reader_->GetReaderMetrics();
+ }
+
+ private:
+ Result<std::shared_ptr<arrow::Array>> PrepareRowKindArray(int32_t
struct_array_length);
+
+ void UpdateFieldNamesWithRowKind(const
std::shared_ptr<arrow::StructArray>& struct_array);
+
+ private:
+ std::unique_ptr<arrow::MemoryPool> arrow_pool_;
+ std::unique_ptr<BatchReader> reader_;
+ std::shared_ptr<arrow::Array> row_kind_array_;
+ std::vector<std::string> field_names_with_row_kind_;
+};
+} // namespace paimon
diff --git a/src/paimon/common/reader/complete_row_kind_batch_reader_test.cpp
b/src/paimon/common/reader/complete_row_kind_batch_reader_test.cpp
new file mode 100644
index 0000000..ab0dc2a
--- /dev/null
+++ b/src/paimon/common/reader/complete_row_kind_batch_reader_test.cpp
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/reader/complete_row_kind_batch_reader.h"
+
+#include <map>
+#include <optional>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/common/table/special_fields.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/format/file_format.h"
+#include "paimon/format/file_format_factory.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/status.h"
+#include "paimon/testing/mock/mock_file_batch_reader.h"
+#include "paimon/testing/utils/read_result_collector.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class CompleteRowKindBatchReaderTest : public ::testing::Test {
+ public:
+ void SetUp() override {
+ pool_ = GetDefaultPool();
+ }
+
+ void TearDown() override {
+ pool_.reset();
+ }
+
+ std::unique_ptr<BatchReader> PrepareCompleteRowKindBatchReader(
+ const std::string& file_name, const std::shared_ptr<arrow::Schema>
read_schema,
+ int32_t batch_size, const std::map<std::string, std::string>& options
= {}) const {
+ EXPECT_OK_AND_ASSIGN(auto file_format, FileFormatFactory::Get("orc",
options));
+ EXPECT_OK_AND_ASSIGN(auto reader_builder,
file_format->CreateReaderBuilder(batch_size));
+ std::shared_ptr<FileSystem> file_system =
std::make_shared<LocalFileSystem>();
+ EXPECT_OK_AND_ASSIGN(std::shared_ptr<InputStream> input_stream,
+ file_system->Open(file_name));
+ EXPECT_OK_AND_ASSIGN(auto orc_batch_reader,
reader_builder->Build(input_stream));
+
+ std::unique_ptr<ArrowSchema> c_schema =
std::make_unique<ArrowSchema>();
+ auto arrow_status = arrow::ExportSchema(*read_schema, c_schema.get());
+ EXPECT_TRUE(arrow_status.ok());
+ EXPECT_OK(orc_batch_reader->SetReadSchema(c_schema.get(),
/*predicate=*/nullptr,
+
/*selection_bitmap=*/std::nullopt));
+ return
std::make_unique<CompleteRowKindBatchReader>(std::move(orc_batch_reader),
pool_);
+ }
+
+ std::unique_ptr<BatchReader> PrepareCompleteRowKindBatchReader(
+ const std::shared_ptr<arrow::Array>& src_array, int32_t batch_size)
const {
+ auto file_batch_reader =
+ std::make_unique<MockFileBatchReader>(src_array,
src_array->type(), batch_size);
+ return
std::make_unique<CompleteRowKindBatchReader>(std::move(file_batch_reader),
pool_);
+ }
+
+ private:
+ std::shared_ptr<MemoryPool> pool_;
+};
+
+TEST_F(CompleteRowKindBatchReaderTest, TestSimple) {
+ std::string file_name = paimon::test::GetDataDir() +
+
"/orc/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/f1=20/"
+
"bucket-0/data-1bd5fd24-4e7d-4ea2-9436-86df0a54b14a-0.orc";
+ std::vector<DataField> read_fields = {DataField(0, arrow::field("f0",
arrow::utf8())),
+ DataField(1, arrow::field("f1",
arrow::int32())),
+ DataField(2, arrow::field("f2",
arrow::int32())),
+ DataField(3, arrow::field("f3",
arrow::float64()))};
+ auto read_schema = DataField::ConvertDataFieldsToArrowSchema(read_fields);
+
+ auto reader = PrepareCompleteRowKindBatchReader(file_name, read_schema,
/*batch_size=*/1);
+ ASSERT_OK_AND_ASSIGN(auto result_array,
ReadResultCollector::CollectResult(reader.get()));
+
+ std::shared_ptr<arrow::ChunkedArray> expected_array;
+ std::vector<DataField> result_fields = read_fields;
+ result_fields.insert(result_fields.begin(), SpecialFields::ValueKind());
+ auto result_schema =
DataField::ConvertDataFieldsToArrowSchema(result_fields);
+ auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(
+ arrow::struct_(result_schema->fields()), {R"([
+ [0, "Lucy", 20, 1, 14.1],
+ [0, "Paul", 20, 1, 18.1]
+])"},
+ &expected_array);
+ ASSERT_TRUE(array_status.ok());
+ ASSERT_TRUE(expected_array->Equals(*result_array));
+ reader->Close();
+}
+
+TEST_F(CompleteRowKindBatchReaderTest, TestInnerReaderContainsRowKind) {
+ std::string file_name = paimon::test::GetDataDir() +
+
"/orc/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/f1=20/"
+
"bucket-0/data-1bd5fd24-4e7d-4ea2-9436-86df0a54b14a-0.orc";
+ std::vector<DataField> read_fields = {SpecialFields::ValueKind(),
+ DataField(0, arrow::field("f0",
arrow::utf8())),
+ DataField(1, arrow::field("f1",
arrow::int32())),
+ DataField(2, arrow::field("f2",
arrow::int32())),
+ DataField(3, arrow::field("f3",
arrow::float64()))};
+ auto read_schema = DataField::ConvertDataFieldsToArrowSchema(read_fields);
+
+ auto reader = PrepareCompleteRowKindBatchReader(file_name, read_schema,
/*batch_size=*/1);
+ ASSERT_OK_AND_ASSIGN(auto result_array,
ReadResultCollector::CollectResult(reader.get()));
+
+ std::shared_ptr<arrow::ChunkedArray> expected_array;
+ auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(
+ arrow::struct_(read_schema->fields()), {R"([
+ [0, "Lucy", 20, 1, 14.1],
+ [0, "Paul", 20, 1, 18.1]
+])"},
+ &expected_array);
+ ASSERT_TRUE(array_status.ok());
+ ASSERT_TRUE(expected_array->Equals(*result_array));
+ reader->Close();
+}
+
+TEST_F(CompleteRowKindBatchReaderTest, TestNestedType) {
+ arrow::FieldVector fields = {
+ arrow::field("f0", arrow::list(arrow::struct_(
+ {field("a", arrow::int64()), field("b",
arrow::boolean())}))),
+ arrow::field("f1", arrow::map(arrow::struct_({field("a",
arrow::int64()),
+ field("b",
arrow::boolean())}),
+ arrow::boolean()))};
+ auto src_array =
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([
+ [[null, [1, true], null], [[[1, true], true]]],
+ [[[2, false], null], null],
+ [[[2, false], [3, true], [4, null]], [[[1, true], true], [[5, false],
null]]],
+ [null, null]
+ ])")
+ .ValueOrDie();
+ ASSERT_TRUE(src_array);
+ auto reader = PrepareCompleteRowKindBatchReader(src_array,
/*batch_size=*/3);
+ ASSERT_OK_AND_ASSIGN(auto result_array,
ReadResultCollector::CollectResult(reader.get()));
+
+ std::shared_ptr<arrow::ChunkedArray> expected_array;
+ arrow::FieldVector read_fields = fields;
+ read_fields.insert(read_fields.begin(), arrow::field("_VALUE_KIND",
arrow::int8()));
+ auto array_status =
+
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow::struct_(read_fields),
{R"([
+ [0, [null, [1, true], null], [[[1, true], true]]],
+ [0, [[2, false], null], null],
+ [0, [[2, false], [3, true], [4, null]], [[[1, true], true], [[5,
false], null]]],
+ [0, null, null]
+])"},
+ &expected_array);
+ ASSERT_TRUE(array_status.ok());
+ ASSERT_TRUE(expected_array->Equals(*result_array));
+ reader->Close();
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/reader/concat_batch_reader.cpp
b/src/paimon/common/reader/concat_batch_reader.cpp
new file mode 100644
index 0000000..c36886c
--- /dev/null
+++ b/src/paimon/common/reader/concat_batch_reader.cpp
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/reader/concat_batch_reader.h"
+
+#include <utility>
+
+#include "arrow/c/abi.h"
+#include "paimon/common/metrics/metrics_impl.h"
+#include "paimon/common/reader/reader_utils.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+
+namespace paimon {
+class MemoryPool;
+
+ConcatBatchReader::ConcatBatchReader(std::vector<std::unique_ptr<BatchReader>>&&
readers,
+ const std::shared_ptr<MemoryPool>& pool)
+ : arrow_pool_(GetArrowPool(pool)), readers_(std::move(readers)),
current_(0) {}
+
+Result<BatchReader::ReadBatch> ConcatBatchReader::NextBatch() {
+ PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatchWithBitmap batch_with_bitmap,
+ NextBatchWithBitmap());
+ return ReaderUtils::ApplyBitmapToReadBatch(std::move(batch_with_bitmap),
arrow_pool_.get());
+}
+
+void ConcatBatchReader::Close() {
+ for (; current_ < readers_.size(); current_++) {
+ readers_[current_]->Close();
+ }
+}
+
+std::shared_ptr<Metrics> ConcatBatchReader::GetReaderMetrics() const {
+ return MetricsImpl::CollectReadMetrics(readers_);
+}
+
+Result<BatchReader::ReadBatchWithBitmap>
ConcatBatchReader::NextBatchWithBitmap() {
+ while (current_ < readers_.size()) {
+ auto& current_reader = readers_[current_];
+ PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatchWithBitmap result,
+ current_reader->NextBatchWithBitmap());
+ if (!BatchReader::IsEofBatch(result)) {
+ // current reader not eof, just return
+ return result;
+ }
+ // current meets eof, move to next reader
+ current_reader->Close();
+ current_++;
+ }
+ // read finish
+ return BatchReader::MakeEofBatchWithBitmap();
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/reader/concat_batch_reader.h
b/src/paimon/common/reader/concat_batch_reader.h
new file mode 100644
index 0000000..7a1b6a1
--- /dev/null
+++ b/src/paimon/common/reader/concat_batch_reader.h
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "arrow/api.h"
+#include "paimon/metrics.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/result.h"
+
+namespace paimon {
+class MemoryPool;
+
+/// This reader is to concatenate a list of BatchReaders and read them
sequentially. The input list
+/// is already sorted by key and sequence number, and the key intervals do not
overlap each other.
+class ConcatBatchReader : public BatchReader {
+ public:
+ ConcatBatchReader(std::vector<std::unique_ptr<BatchReader>>&& readers,
+ const std::shared_ptr<MemoryPool>& pool);
+
+ Result<ReadBatch> NextBatch() override;
+ Result<ReadBatchWithBitmap> NextBatchWithBitmap() override;
+ void Close() override;
+ std::shared_ptr<Metrics> GetReaderMetrics() const override;
+
+ private:
+ std::unique_ptr<arrow::MemoryPool> arrow_pool_;
+ std::vector<std::unique_ptr<BatchReader>> readers_;
+ size_t current_;
+};
+} // namespace paimon
diff --git a/src/paimon/common/reader/concat_batch_reader_test.cpp
b/src/paimon/common/reader/concat_batch_reader_test.cpp
new file mode 100644
index 0000000..37a415d
--- /dev/null
+++ b/src/paimon/common/reader/concat_batch_reader_test.cpp
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/reader/concat_batch_reader.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <string>
+#include <utility>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/status.h"
+#include "paimon/testing/mock/mock_file_batch_reader.h"
+#include "paimon/testing/utils/read_result_collector.h"
+#include "paimon/testing/utils/testharness.h"
+#include "paimon/utils/roaring_bitmap32.h"
+
+namespace paimon::test {
+class ConcatBatchReaderTest : public ::testing::Test {
+ void SetUp() override {
+ pool_ = GetDefaultPool();
+ }
+ void CheckResult(const std::vector<std::string>& batches, const
std::string& expected) {
+ std::vector<std::pair<std::string, std::vector<int32_t>>>
batches_with_bitmap;
+ for (const auto& batch_str : batches) {
+ int32_t row_count = std::count(batch_str.begin(), batch_str.end(),
',');
+ if (batch_str != "[]") {
+ row_count += 1;
+ }
+ std::vector<int32_t> bitmap_data;
+ for (int32_t i = 0; i < row_count; i++) {
+ bitmap_data.push_back(i);
+ }
+ batches_with_bitmap.emplace_back(batch_str, bitmap_data);
+ }
+ return CheckResult(batches_with_bitmap, expected);
+ }
+
+ void CheckResult(const std::vector<std::pair<std::string,
std::vector<int32_t>>>& batches,
+ const std::string& expected) {
+ for (const auto& batch_size : {1, 2, 4, 8}) {
+ std::vector<std::unique_ptr<BatchReader>> readers;
+ for (const auto& [batch_str, bitmap_data] : batches) {
+ auto f1 =
arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), batch_str)
+ .ValueOrDie();
+ std::shared_ptr<arrow::Array> data =
+ arrow::StructArray::Make({f1}, {arrow::field("f1",
arrow::int32())})
+ .ValueOrDie();
+ auto reader = std::make_unique<MockFileBatchReader>(
+ data, data->type(), RoaringBitmap32::From(bitmap_data),
batch_size);
+ readers.push_back(std::move(reader));
+ }
+ auto concat_reader =
std::make_unique<ConcatBatchReader>(std::move(readers), pool_);
+ ASSERT_OK_AND_ASSIGN(auto result_chunk_array,
+
ReadResultCollector::CollectResult(concat_reader.get()));
+ if (expected.empty()) {
+ ASSERT_FALSE(result_chunk_array);
+ return;
+ }
+ auto expected_f1 =
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(),
expected).ValueOrDie();
+ std::shared_ptr<arrow::Array> expected_array =
+ arrow::StructArray::Make({expected_f1}, {arrow::field("f1",
arrow::int32())})
+ .ValueOrDie();
+ auto expected_chunk_array =
std::make_shared<arrow::ChunkedArray>(expected_array);
+ ASSERT_TRUE(expected_chunk_array->Equals(result_chunk_array))
+ << result_chunk_array->ToString();
+ }
+ }
+
+ private:
+ std::shared_ptr<MemoryPool> pool_;
+};
+TEST_F(ConcatBatchReaderTest, TestSimple) {
+ CheckResult({"[10, 11, 12, 13, 14]"}, "[10, 11, 12, 13, 14]");
+
+ CheckResult({"[10, 11, 12, 13, 14]", "[16, 17, 20]", "[24]", "[100]"},
+ "[10, 11, 12, 13, 14, 16, 17, 20, 24, 100]");
+
+ CheckResult({"[]", "[10, 11, 12, 13, 14]", "[16, 17, 20]", "[24]",
"[100]"},
+ "[10, 11, 12, 13, 14, 16, 17, 20, 24, 100]");
+
+ CheckResult({"[10, 11, 12, 13, 14]", "[]", "[16, 17, 20]", "[24]",
"[100]"},
+ "[10, 11, 12, 13, 14, 16, 17, 20, 24, 100]");
+
+ CheckResult({"[10, 11, 12, 13, 14]", "[16, 17, 20]", "[24]", "[100]",
"[]"},
+ "[10, 11, 12, 13, 14, 16, 17, 20, 24, 100]");
+
+ // no data in reader
+ CheckResult({"[]"}, "");
+
+ // no reader
+ CheckResult(std::vector<std::string>{}, "");
+}
+
+TEST_F(ConcatBatchReaderTest, TestSimpleWithBitmap) {
+ {
+ std::vector<std::pair<std::string, std::vector<int32_t>>> src_data = {
+ {"[10, 11, 12, 13, 14]", {0, 1, 3, 4}}};
+ CheckResult(src_data, "[10, 11, 13, 14]");
+ }
+ {
+ std::vector<std::pair<std::string, std::vector<int32_t>>> src_data = {
+ {"[10, 11, 12, 13, 14]", {1, 2, 3}},
+ {"[16, 17, 20]", {0, 2}},
+ {"[24]", {}},
+ {"[100]", {0}}};
+ CheckResult(src_data, "[11, 12, 13, 16, 20, 100]");
+ }
+ {
+ std::vector<std::pair<std::string, std::vector<int32_t>>> src_data = {
+ {"[]", {}},
+ {"[10, 11, 12, 13, 14]", {1, 2, 3}},
+ {"[16, 17, 20]", {0, 2}},
+ {"[24]", {}},
+ {"[100]", {0}}};
+ CheckResult(src_data, "[11, 12, 13, 16, 20, 100]");
+ }
+ {
+ std::vector<std::pair<std::string, std::vector<int32_t>>> src_data = {
+ {"[10, 11, 12, 13, 14]", {1, 2, 3}},
+ {"[]", {}},
+ {"[16, 17, 20]", {0, 2}},
+ {"[24]", {}},
+ {"[100]", {0}}};
+ CheckResult(src_data, "[11, 12, 13, 16, 20, 100]");
+ }
+ {
+ std::vector<std::pair<std::string, std::vector<int32_t>>> src_data = {
+ {"[10, 11, 12, 13, 14]", {1, 2, 3}},
+ {"[16, 17, 20]", {0, 2}},
+ {"[24]", {}},
+ {"[100]", {0}},
+ {"[]", {}},
+ };
+ CheckResult(src_data, "[11, 12, 13, 16, 20, 100]");
+ }
+ {
+ // no data in reader
+ std::vector<std::pair<std::string, std::vector<int32_t>>> src_data =
{{"[]", {}},
+
{"[]", {}}};
+ CheckResult(src_data, "");
+ }
+ {
+ // no reader
+ std::vector<std::pair<std::string, std::vector<int32_t>>> src_data =
{};
+ CheckResult(src_data, "");
+ }
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/reader/delegating_prefetch_reader.h
b/src/paimon/common/reader/delegating_prefetch_reader.h
new file mode 100644
index 0000000..6d2b09c
--- /dev/null
+++ b/src/paimon/common/reader/delegating_prefetch_reader.h
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "arrow/c/bridge.h"
+#include "arrow/type.h"
+#include "paimon/common/reader/prefetch_file_batch_reader_impl.h"
+#include "paimon/reader/file_batch_reader.h"
+
+namespace paimon {
+
+class DelegatingPrefetchReader : public FileBatchReader {
+ public:
+ explicit
DelegatingPrefetchReader(std::unique_ptr<PrefetchFileBatchReaderImpl>
prefetch_reader)
+ : prefetch_reader_(std::move(prefetch_reader)) {}
+
+ Result<ReadBatch> NextBatch() override {
+ return Status::Invalid(
+ "paimon inner reader DelegatingPrefetchReader should use
NextBatchWithBitmap");
+ }
+
+ Result<ReadBatchWithBitmap> NextBatchWithBitmap() override {
+ return GetReader()->NextBatchWithBitmap();
+ }
+
+ std::shared_ptr<Metrics> GetReaderMetrics() const override {
+ return GetReader()->GetReaderMetrics();
+ }
+
+ Result<std::unique_ptr<::ArrowSchema>> GetFileSchema() const override {
+ return GetReader()->GetFileSchema();
+ }
+
+ Status SetReadSchema(::ArrowSchema* read_schema, const
std::shared_ptr<Predicate>& predicate,
+ const std::optional<RoaringBitmap32>&
selection_bitmap) override {
+ return prefetch_reader_->SetReadSchema(read_schema, predicate,
selection_bitmap);
+ }
+
+ Result<uint64_t> GetPreviousBatchFirstRowNumber() const override {
+ return GetReader()->GetPreviousBatchFirstRowNumber();
+ }
+
+ Result<uint64_t> GetNumberOfRows() const override {
+ return GetReader()->GetNumberOfRows();
+ }
+
+ void Close() override {
+ return prefetch_reader_->Close();
+ }
+
+ bool SupportPreciseBitmapSelection() const override {
+ return GetReader()->SupportPreciseBitmapSelection();
+ }
+
+ private:
+ inline FileBatchReader* GetReader() const {
+ assert(prefetch_reader_);
+ if (prefetch_reader_->NeedPrefetch()) {
+ return prefetch_reader_.get();
+ } else {
+ return prefetch_reader_->GetFirstReader();
+ }
+ }
+
+ std::unique_ptr<PrefetchFileBatchReaderImpl> prefetch_reader_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/reader/predicate_batch_reader.cpp
b/src/paimon/common/reader/predicate_batch_reader.cpp
new file mode 100644
index 0000000..3bdea51
--- /dev/null
+++ b/src/paimon/common/reader/predicate_batch_reader.cpp
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/reader/predicate_batch_reader.h"
+
+#include <cassert>
+#include <cstddef>
+#include <cstdint>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/array/array_base.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/memory_pool.h"
+#include "fmt/format.h"
+#include "paimon/common/predicate/predicate_filter.h"
+#include "paimon/common/reader/reader_utils.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/predicate/predicate.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class MemoryPool;
+
+PredicateBatchReader::PredicateBatchReader(std::unique_ptr<BatchReader>&&
reader,
+ const
std::shared_ptr<PredicateFilter>& predicate_filter,
+ const std::shared_ptr<MemoryPool>&
pool)
+ : arrow_pool_(GetArrowPool(pool)),
+ reader_(std::move(reader)),
+ predicate_filter_(predicate_filter) {}
+
+Result<std::unique_ptr<PredicateBatchReader>> PredicateBatchReader::Create(
+ std::unique_ptr<BatchReader>&& reader, const std::shared_ptr<Predicate>&
predicate,
+ const std::shared_ptr<MemoryPool>& pool) {
+ if (!predicate) {
+ return Status::Invalid("create predicate batch reader failed.
predicate is nullptr");
+ }
+ auto predicate_filter =
std::dynamic_pointer_cast<PredicateFilter>(predicate);
+ if (!predicate_filter) {
+ return Status::Invalid(
+ fmt::format("predicate {} does not support Test",
predicate->ToString()));
+ }
+ return std::unique_ptr<PredicateBatchReader>(
+ new PredicateBatchReader(std::move(reader), predicate_filter, pool));
+}
+
+Result<BatchReader::ReadBatch> PredicateBatchReader::NextBatch() {
+ PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatchWithBitmap batch_with_bitmap,
+ NextBatchWithBitmap());
+ return ReaderUtils::ApplyBitmapToReadBatch(std::move(batch_with_bitmap),
arrow_pool_.get());
+}
+
+Result<BatchReader::ReadBatchWithBitmap>
PredicateBatchReader::NextBatchWithBitmap() {
+ while (true) {
+ PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatchWithBitmap
batch_with_bitmap,
+ reader_->NextBatchWithBitmap());
+ if (BatchReader::IsEofBatch(batch_with_bitmap)) {
+ return batch_with_bitmap;
+ }
+ auto& [batch, bitmap] = batch_with_bitmap;
+ auto& [c_array, c_schema] = batch;
+ assert(c_array);
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> array,
+ arrow::ImportArray(c_array.get(),
c_schema.get()));
+ PAIMON_ASSIGN_OR_RAISE(RoaringBitmap32 valid_bitmap, Filter(array));
+ bitmap &= valid_bitmap;
+ if (bitmap.IsEmpty()) {
+ continue;
+ }
+ PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array,
c_array.get(), c_schema.get()));
+ return batch_with_bitmap;
+ }
+}
+
+Result<RoaringBitmap32> PredicateBatchReader::Filter(
+ const std::shared_ptr<arrow::Array>& array) const {
+ PAIMON_ASSIGN_OR_RAISE(std::vector<char> result,
predicate_filter_->Test(*array));
+ assert(result.size() == static_cast<size_t>(array->length()));
+ RoaringBitmap32 is_valid;
+ for (int32_t i = 0; i < static_cast<int32_t>(result.size()); i++) {
+ if (result[i]) {
+ is_valid.Add(i);
+ }
+ }
+ return is_valid;
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/reader/predicate_batch_reader.h
b/src/paimon/common/reader/predicate_batch_reader.h
new file mode 100644
index 0000000..ddfd72a
--- /dev/null
+++ b/src/paimon/common/reader/predicate_batch_reader.h
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/memory_pool.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/result.h"
+#include "paimon/utils/roaring_bitmap32.h"
+
+namespace arrow {
+class Array;
+} // namespace arrow
+
+namespace paimon {
+class MemoryPool;
+class Metrics;
+class Predicate;
+class PredicateFilter;
+
+class PredicateBatchReader : public BatchReader {
+ public:
+ static Result<std::unique_ptr<PredicateBatchReader>> Create(
+ std::unique_ptr<BatchReader>&& reader, const
std::shared_ptr<Predicate>& predicate,
+ const std::shared_ptr<MemoryPool>& pool);
+
+ ~PredicateBatchReader() override = default;
+
+ Result<BatchReader::ReadBatch> NextBatch() override;
+
+ Result<BatchReader::ReadBatchWithBitmap> NextBatchWithBitmap() override;
+
+ void Close() override {
+ return reader_->Close();
+ }
+
+ std::shared_ptr<Metrics> GetReaderMetrics() const override {
+ return reader_->GetReaderMetrics();
+ }
+
+ private:
+ PredicateBatchReader(std::unique_ptr<BatchReader>&& reader,
+ const std::shared_ptr<PredicateFilter>&
predicate_filter,
+ const std::shared_ptr<MemoryPool>& pool);
+ Result<RoaringBitmap32> Filter(const std::shared_ptr<arrow::Array>& array)
const;
+
+ private:
+ std::unique_ptr<arrow::MemoryPool> arrow_pool_;
+ std::unique_ptr<BatchReader> reader_;
+ std::shared_ptr<PredicateFilter> predicate_filter_;
+};
+} // namespace paimon
diff --git a/src/paimon/common/reader/predicate_batch_reader_test.cpp
b/src/paimon/common/reader/predicate_batch_reader_test.cpp
new file mode 100644
index 0000000..e23c257
--- /dev/null
+++ b/src/paimon/common/reader/predicate_batch_reader_test.cpp
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/reader/predicate_batch_reader.h"
+
+#include <cstdint>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_nested.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/defs.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/predicate/predicate_builder.h"
+#include "paimon/status.h"
+#include "paimon/testing/mock/mock_file_batch_reader.h"
+#include "paimon/testing/utils/read_result_collector.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon {
+class Predicate;
+} // namespace paimon
+
+namespace paimon::test {
+class PredicateBatchReaderTest : public ::testing::Test {
+ public:
+ void SetUp() override {
+ fields_ = {arrow::field("f0", arrow::utf8()), arrow::field("f1",
arrow::int64()),
+ arrow::field("f2", arrow::boolean())};
+ data_type_ = arrow::struct_(fields_);
+ }
+
+ void TearDown() override {}
+
+ std::shared_ptr<arrow::Array> PrepareArray(int32_t length, int32_t offset
= 0) {
+ arrow::StructBuilder struct_builder(
+ data_type_, arrow::default_memory_pool(),
+ {std::make_shared<arrow::StringBuilder>(),
std::make_shared<arrow::Int64Builder>(),
+ std::make_shared<arrow::BooleanBuilder>()});
+ auto string_builder =
static_cast<arrow::StringBuilder*>(struct_builder.field_builder(0));
+ auto big_int_builder =
static_cast<arrow::Int64Builder*>(struct_builder.field_builder(1));
+ auto bool_builder =
static_cast<arrow::BooleanBuilder*>(struct_builder.field_builder(2));
+ for (int32_t i = 0 + offset; i < length + offset; ++i) {
+ EXPECT_TRUE(struct_builder.Append().ok());
+ EXPECT_TRUE(string_builder->Append("str_" +
std::to_string(i)).ok());
+ EXPECT_TRUE(big_int_builder->Append(i).ok());
+ EXPECT_TRUE(bool_builder->Append(static_cast<bool>(i % 2)).ok());
+ }
+ std::shared_ptr<arrow::Array> array;
+ EXPECT_TRUE(struct_builder.Finish(&array).ok());
+ return array;
+ }
+
+ void CheckResult(std::unique_ptr<BatchReader>&& reader,
+ const std::shared_ptr<Predicate>& predicate,
+ const std::shared_ptr<arrow::ChunkedArray>&
expected_array) const {
+ ASSERT_OK_AND_ASSIGN(
+ auto predicate_reader,
+ PredicateBatchReader::Create(std::move(reader), predicate,
GetDefaultPool()));
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::ChunkedArray> result_array,
+
ReadResultCollector::CollectResult(predicate_reader.get()));
+ if (expected_array) {
+ ASSERT_TRUE(result_array->Equals(expected_array));
+ } else {
+ ASSERT_FALSE(result_array);
+ }
+ }
+
+ private:
+ arrow::FieldVector fields_;
+ std::shared_ptr<arrow::DataType> data_type_;
+};
+
+TEST_F(PredicateBatchReaderTest, TestSimple) {
+ auto data_array = PrepareArray(100);
+ {
+ auto reader =
+ std::make_unique<MockFileBatchReader>(data_array, data_type_,
/*batch_size=*/10);
+ auto predicate = PredicateBuilder::LessThan(/*field_index=*/1,
/*field_name=*/"f1",
+ FieldType::BIGINT,
Literal(24l));
+ auto expected_array =
std::make_shared<arrow::ChunkedArray>(data_array->Slice(0, 24));
+ CheckResult(std::move(reader), predicate, expected_array);
+ }
+ {
+ auto reader =
+ std::make_unique<MockFileBatchReader>(data_array, data_type_,
/*batch_size=*/10);
+ auto predicate = PredicateBuilder::LessThan(/*field_index=*/1,
/*field_name=*/"f1",
+ FieldType::BIGINT,
Literal(1l));
+ auto expected_array =
std::make_shared<arrow::ChunkedArray>(data_array->Slice(0, 1));
+ CheckResult(std::move(reader), predicate, expected_array);
+ }
+ {
+ auto reader =
+ std::make_unique<MockFileBatchReader>(data_array, data_type_,
/*batch_size=*/10);
+ auto predicate = PredicateBuilder::LessThan(/*field_index=*/1,
/*field_name=*/"f1",
+ FieldType::BIGINT,
Literal(99l));
+ auto expected_array =
std::make_shared<arrow::ChunkedArray>(data_array->Slice(0, 99));
+ CheckResult(std::move(reader), predicate, expected_array);
+ }
+ {
+ auto reader =
+ std::make_unique<MockFileBatchReader>(data_array, data_type_,
/*batch_size=*/10);
+ auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/1,
/*field_name=*/"f1",
+ FieldType::BIGINT,
Literal(0l));
+ auto expected_array =
std::make_shared<arrow::ChunkedArray>(data_array->Slice(1, 99));
+ CheckResult(std::move(reader), predicate, expected_array);
+ }
+ {
+ auto reader =
+ std::make_unique<MockFileBatchReader>(data_array, data_type_,
/*batch_size=*/10);
+ auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/1,
/*field_name=*/"f1",
+ FieldType::BIGINT,
Literal(98l));
+ auto expected_array =
std::make_shared<arrow::ChunkedArray>(data_array->Slice(99, 1));
+ CheckResult(std::move(reader), predicate, expected_array);
+ }
+}
+
+TEST_F(PredicateBatchReaderTest, TestVariousBatchSize) {
+ auto data_array = arrow::ipc::internal::json::ArrayFromJSON(data_type_,
R"([
+ ["str_-1", -1, false],
+ ["str_0", 0, false], ["str_1", 1, true],
+ ["str_-1", -1, false],
+ ["str_2", 2, false], ["str_3", 3, true],
+ ["str_4", 4, false], ["str_5", 5, true], ["str_6", 6, false],
+ ["str_-1", -1, false],
+ ["str_7", 7, true],
+ ["str_-1", -1, false]
+ ])")
+ .ValueOrDie();
+ auto expected_array =
std::make_shared<arrow::ChunkedArray>(PrepareArray(8));
+ auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/1,
/*field_name=*/"f1",
+ FieldType::BIGINT,
Literal(-1l));
+ for (auto batch_size : {5, 10, 11, 20}) {
+ auto reader = std::make_unique<MockFileBatchReader>(data_array,
data_type_, batch_size);
+ CheckResult(std::move(reader), predicate, expected_array);
+ }
+}
+
+TEST_F(PredicateBatchReaderTest, TestOneByOneCase) {
+ auto data_array = PrepareArray(8);
+ auto reader = std::make_unique<MockFileBatchReader>(data_array,
data_type_, /*batch_size=*/10);
+ auto predicate = PredicateBuilder::Equal(/*field_index=*/2,
/*field_name=*/"f2",
+ FieldType::BOOLEAN,
Literal(true));
+ std::shared_ptr<arrow::ChunkedArray> expected_array;
+ auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(data_type_, {R"([
+ ["str_1", 1, true], ["str_3", 3, true], ["str_5", 5, true], ["str_7",
7, true]
+ ])"},
+
&expected_array);
+ CheckResult(std::move(reader), predicate, expected_array);
+}
+
+TEST_F(PredicateBatchReaderTest, TestFullAndEmptyCase) {
+ auto data_array = PrepareArray(15);
+ {
+ auto reader =
+ std::make_unique<MockFileBatchReader>(data_array, data_type_,
/*batch_size=*/10);
+ auto predicate = PredicateBuilder::LessThan(/*field_index=*/1,
/*field_name=*/"f1",
+ FieldType::BIGINT,
Literal(20l));
+ auto expected_array =
std::make_shared<arrow::ChunkedArray>(data_array);
+ CheckResult(std::move(reader), predicate, expected_array);
+ }
+ {
+ auto reader =
+ std::make_unique<MockFileBatchReader>(data_array, data_type_,
/*batch_size=*/10);
+ auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/1,
/*field_name=*/"f1",
+ FieldType::BIGINT,
Literal(20l));
+ CheckResult(std::move(reader), predicate, nullptr);
+ }
+}
+
+TEST_F(PredicateBatchReaderTest, TestInvalidInput) {
+ auto data_array = PrepareArray(8);
+ auto reader = std::make_unique<MockFileBatchReader>(data_array,
data_type_, /*batch_size=*/10);
+ ASSERT_NOK_WITH_MSG(PredicateBatchReader::Create(std::move(reader),
nullptr, GetDefaultPool()),
+ "create predicate batch reader failed. predicate is
nullptr");
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/reader/reader_utils.cpp
b/src/paimon/common/reader/reader_utils.cpp
new file mode 100644
index 0000000..7bb28c7
--- /dev/null
+++ b/src/paimon/common/reader/reader_utils.cpp
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/reader/reader_utils.h"
+
+#include <cassert>
+#include <cstdint>
+#include <memory>
+#include <utility>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/c/helpers.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/status.h"
+#include "paimon/utils/roaring_bitmap32.h"
+
+namespace paimon {
+Result<arrow::ArrayVector> ReaderUtils::GenerateFilteredArrayVector(
+ const std::shared_ptr<arrow::Array>& src_array, const RoaringBitmap32&
bitmap) {
+ if (bitmap.Cardinality() == 0) {
+ return Status::Invalid("selection bitmap cannot be empty in
GenerateFilteredArrayVector");
+ }
+ arrow::ArrayVector array_vec;
+ auto valid_iter = bitmap.Begin();
+ int32_t pos = 0;
+ while (valid_iter != bitmap.End() && pos < src_array->length()) {
+ int64_t valid_start_pos = *valid_iter;
+ for (pos = *valid_iter; pos < src_array->length() && valid_iter !=
bitmap.End();
+ pos++, ++valid_iter) {
+ if (pos != *valid_iter) {
+ break;
+ }
+ }
+ int64_t valid_end_pos = pos;
+ array_vec.push_back(src_array->Slice(valid_start_pos, valid_end_pos -
valid_start_pos));
+ }
+ assert(!array_vec.empty());
+ return array_vec;
+}
+
+void ReaderUtils::ReleaseReadBatch(BatchReader::ReadBatch&& batch) {
+ auto& [c_array, c_schema] = batch;
+ if (c_array) {
+ ArrowArrayRelease(c_array.get());
+ }
+ if (c_schema) {
+ ArrowSchemaRelease(c_schema.get());
+ }
+}
+
+Result<BatchReader::ReadBatch> ReaderUtils::ApplyBitmapToReadBatch(
+ BatchReader::ReadBatchWithBitmap&& batch_with_bitmap, arrow::MemoryPool*
arrow_pool) {
+ if (BatchReader::IsEofBatch(batch_with_bitmap)) {
+ return std::move(batch_with_bitmap.first);
+ }
+ BatchReader::ReadBatchWithBitmap moved_batch_with_bitmap =
std::move(batch_with_bitmap);
+ auto& [batch, bitmap] = moved_batch_with_bitmap;
+ auto& [c_array, c_schema] = batch;
+ assert(c_array);
+ if (bitmap.IsEmpty()) {
+ ReleaseReadBatch(std::move(batch));
+ return Status::Invalid(
+ "NextBatchWithBitmap should always return the result with at least
one valid row "
+ "except eof");
+ }
+ if (bitmap.Cardinality() == c_array->length) {
+ // indicates all rows in batch are valid
+ return std::move(batch);
+ }
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array>
arrow_array,
+ arrow::ImportArray(c_array.get(),
c_schema.get()));
+ PAIMON_ASSIGN_OR_RAISE(arrow::ArrayVector array_vec,
+ GenerateFilteredArrayVector(arrow_array, bitmap));
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> result,
+ arrow::Concatenate(array_vec,
arrow_pool));
+ assert(result && result->length() > 0);
+ std::unique_ptr<ArrowArray> result_c_array =
std::make_unique<ArrowArray>();
+ std::unique_ptr<ArrowSchema> result_c_schema =
std::make_unique<ArrowSchema>();
+ PAIMON_RETURN_NOT_OK_FROM_ARROW(
+ arrow::ExportArray(*result, result_c_array.get(),
result_c_schema.get()));
+ return make_pair(std::move(result_c_array), std::move(result_c_schema));
+}
+
+BatchReader::ReadBatchWithBitmap
ReaderUtils::AddAllValidBitmap(BatchReader::ReadBatch&& batch) {
+ if (BatchReader::IsEofBatch(batch)) {
+ return BatchReader::MakeEofBatchWithBitmap();
+ }
+ RoaringBitmap32 all_valid;
+ all_valid.AddRange(0, batch.first->length);
+ return std::make_pair(std::move(batch), std::move(all_valid));
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/reader/reader_utils.h
b/src/paimon/common/reader/reader_utils.h
new file mode 100644
index 0000000..aa77e40
--- /dev/null
+++ b/src/paimon/common/reader/reader_utils.h
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "arrow/api.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/result.h"
+
+namespace arrow {
+class MemoryPool;
+class Array;
+class StructArray;
+} // namespace arrow
+
+namespace paimon {
+class RoaringBitmap32;
+
+class ReaderUtils {
+ public:
+ ReaderUtils() = delete;
+ ~ReaderUtils() = delete;
+
+ /// @param batch_with_bitmap where the bitmap records the valid row ids in
the array
+ /// @param arrow_pool a pool for arrow
+ /// @return returned array contains all the valid rows in the input array
+ /// This function may trigger data copy.
+ static Result<BatchReader::ReadBatch> ApplyBitmapToReadBatch(
+ BatchReader::ReadBatchWithBitmap&& batch_with_bitmap,
arrow::MemoryPool* arrow_pool);
+ /// @param batch a read batch
+ /// @return return the input batch and a all valid bitmap
+ static BatchReader::ReadBatchWithBitmap
AddAllValidBitmap(BatchReader::ReadBatch&& batch);
+
+ /// Release the c array and c schema in batch.
+ static void ReleaseReadBatch(BatchReader::ReadBatch&& batch);
+
+ /// Split the array into multiple valid sub-arrays according to the bitmap.
+ /// Precondition: input bitmap is not empty
+ static Result<arrow::ArrayVector> GenerateFilteredArrayVector(
+ const std::shared_ptr<arrow::Array>& src_array, const RoaringBitmap32&
bitmap);
+};
+} // namespace paimon
diff --git a/src/paimon/common/reader/reader_utils_test.cpp
b/src/paimon/common/reader/reader_utils_test.cpp
new file mode 100644
index 0000000..96b5a4d
--- /dev/null
+++ b/src/paimon/common/reader/reader_utils_test.cpp
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/reader/reader_utils.h"
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/c/abi.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/read_result_collector.h"
+#include "paimon/testing/utils/testharness.h"
+#include "paimon/utils/roaring_bitmap32.h"
+
+namespace paimon::test {
+TEST(ReaderUtilsTest, TestAddAllValidBitmap) {
+ auto check_result = [](const std::string& src_str) {
+ if (src_str.empty()) {
+ auto batch_with_bitmap =
ReaderUtils::AddAllValidBitmap(BatchReader::MakeEofBatch());
+ ASSERT_TRUE(BatchReader::IsEofBatch(batch_with_bitmap));
+ auto& [_, bitmap] = batch_with_bitmap;
+ ASSERT_TRUE(bitmap.IsEmpty());
+ return;
+ }
+ auto array =
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(),
src_str).ValueOrDie();
+ ASSERT_OK_AND_ASSIGN(auto batch,
ReadResultCollector::GetReadBatch(array));
+ auto batch_with_bitmap =
ReaderUtils::AddAllValidBitmap(std::move(batch));
+ auto& [c_batch, bitmap] = batch_with_bitmap;
+ ASSERT_OK_AND_ASSIGN(auto result_array,
ReadResultCollector::GetArray(std::move(c_batch)));
+ ASSERT_EQ(bitmap.Cardinality(), result_array->length());
+ ASSERT_TRUE(result_array->Equals(array));
+ };
+
+ check_result("[0, 1, 2, 3, 4]");
+ check_result("[10, 20, 30]");
+ check_result("");
+}
+TEST(ReaderUtilsTest, TestApplyBitmapToReadBatch) {
+ auto check_result = [](const std::string& src_str, const
std::vector<int32_t>& bitmap_vec,
+ const std::string& target_str, const std::string&
erro_msg = "") {
+ auto bitmap = RoaringBitmap32::From(bitmap_vec);
+ if (src_str.empty()) {
+ auto batch_with_bitmap =
std::make_pair(BatchReader::MakeEofBatch(), std::move(bitmap));
+ ASSERT_OK_AND_ASSIGN(auto result_batch,
+
ReaderUtils::ApplyBitmapToReadBatch(std::move(batch_with_bitmap),
+
arrow::default_memory_pool()));
+ ASSERT_TRUE(BatchReader::IsEofBatch(result_batch));
+ return;
+ }
+ auto src_array =
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(),
src_str).ValueOrDie();
+ auto target_array =
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(),
target_str).ValueOrDie();
+ ASSERT_OK_AND_ASSIGN(auto src_batch,
ReadResultCollector::GetReadBatch(src_array));
+ auto batch_with_bitmap = std::make_pair(std::move(src_batch),
std::move(bitmap));
+ if (!erro_msg.empty()) {
+
ASSERT_NOK_WITH_MSG(ReaderUtils::ApplyBitmapToReadBatch(std::move(batch_with_bitmap),
+
arrow::default_memory_pool()),
+ erro_msg);
+ return;
+ }
+ ASSERT_OK_AND_ASSIGN(auto result_batch,
+
ReaderUtils::ApplyBitmapToReadBatch(std::move(batch_with_bitmap),
+
arrow::default_memory_pool()));
+ ASSERT_OK_AND_ASSIGN(auto result_array,
+
ReadResultCollector::GetArray(std::move(result_batch)));
+ ASSERT_TRUE(result_array->Equals(target_array));
+ };
+ check_result("[10, 11, 12, 13, 14]", {1}, "[11]");
+ check_result("[10, 11, 12, 13, 14]", {0, 1}, "[10, 11]");
+ check_result("[10, 11, 12, 13, 14]", {2, 4}, "[12, 14]");
+ check_result("[10, 11, 12, 13, 14]", {2, 3}, "[12, 13]");
+ check_result("[10, 11, 12, 13, 14]", {0, 1, 3, 4}, "[10, 11, 13, 14]");
+ check_result("[10, 11, 12, 13, 14]", {0, 1, 2, 3, 4}, "[10, 11, 12, 13,
14]");
+ // eof batch
+ check_result("", {}, "");
+ // bitmap is empty, invalid
+ check_result("[10, 11, 12, 13, 14]", {}, "[]",
+ "NextBatchWithBitmap should always return the result with at
least one valid row "
+ "except eof");
+}
+
+} // namespace paimon::test